tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-520. Generate splits within the AM. (sseth)
Date Thu, 10 Oct 2013 18:01:45 GMT
Updated Branches:
  refs/heads/master 05fb9fbe5 -> ad5666a9d


TEZ-520. Generate splits within the AM. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ad5666a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ad5666a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ad5666a9

Branch: refs/heads/master
Commit: ad5666a9d88ea26bdf17c14f35898eed58d5d094
Parents: 05fb9fb
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Oct 10 10:57:37 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Oct 10 10:57:37 2013 -0700

----------------------------------------------------------------------
 .../RootInputConfigureVertexTasksEvent.java     |  45 +++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   4 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   4 +-
 .../app/dag/impl/RootInputVertexManager.java    |  16 ++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  69 +++++++++----
 .../mapreduce/examples/FilterLinesByWord.java   |  55 +++++++---
 .../tez/mapreduce/examples/MRRSleepJob.java     |  85 ++++++++++------
 .../mapreduce/examples/OrderedWordCount.java    |  78 +++++++++++----
 .../helpers/SplitsInClientOptionParser.java     |  72 +++++++++++++
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  77 +++++++++-----
 .../common/MRInputAMSplitGenerator.java         | 100 +++++++++++++++++++
 .../common/MRInputSplitDistributor.java         |  24 +++--
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  31 ++++--
 .../tez/mapreduce/hadoop/MRJobConfig.java       |   2 +-
 .../org/apache/tez/mapreduce/input/MRInput.java |   4 +-
 .../src/main/proto/MRRuntimeProtos.proto        |   2 +-
 .../org/apache/tez/mapreduce/YARNRunner.java    |   2 +-
 17 files changed, 537 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputConfigureVertexTasksEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputConfigureVertexTasksEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputConfigureVertexTasksEvent.java
new file mode 100644
index 0000000..1eb7f14
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputConfigureVertexTasksEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import java.util.List;
+
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.runtime.api.Event;
+
+public class RootInputConfigureVertexTasksEvent extends Event {
+
+  private final int numTasks;
+  private final List<TaskLocationHint> taskLocationHints;
+  
+  public RootInputConfigureVertexTasksEvent(int numTasks, List<TaskLocationHint> locationHints) {
+    this.numTasks = numTasks;
+    this.taskLocationHints = locationHints;
+  }
+
+  public int getNumTasks() {
+    return numTasks;
+  }
+
+  public List<TaskLocationHint> getTaskLocationHints() {
+    return taskLocationHints;
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 217d86d..4877f35 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -146,7 +146,7 @@ import org.apache.tez.runtime.library.processor.SleepProcessor;
 
 @SuppressWarnings("rawtypes")
 public class DAGAppMaster extends AbstractService {
-
+ 
   private static final Log LOG = LogFactory.getLog(DAGAppMaster.class);
 
   /**
@@ -1340,7 +1340,7 @@ public class DAGAppMaster extends AbstractService {
 
       long appSubmitTime = Long.parseLong(appSubmitTimeStr);
 
-      Configuration conf = new Configuration(new YarnConfiguration());
+      final Configuration conf = new Configuration(new YarnConfiguration());
       TezUtils.addUserSpecifiedTezConfiguration(conf);
 
       String jobUserName = System

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 2aaabab..caab317 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -27,6 +27,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -69,7 +70,8 @@ public interface Vertex extends Comparable<Vertex> {
   ProgressBuilder getVertexProgress();
   VertexStatusBuilder getVertexStatus();
 
-  void setParallelism(int parallelism,Map<Vertex, EdgeManager> sourceEdgeManagers);
+  void setParallelism(int parallelism, Map<Vertex, EdgeManager> sourceEdgeManagers);
+  void setVertexLocationHint(VertexLocationHint vertexLocationHint);
 
   // CHANGE THESE TO LISTS AND MAINTAIN ORDER?
   void setInputVertices(Map<Vertex, Edge> inVertices);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 4329192..db0b009 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.app.dag.VertexState;
@@ -32,6 +33,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -49,7 +51,7 @@ public class RootInputVertexManager implements VertexScheduler {
   private final Map<String, EventMetaData> destInfoMap;
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
-
+  
   @SuppressWarnings("rawtypes")
   public RootInputVertexManager(Vertex vertex, EventHandler eventHandler) {
     this.managedVertex = vertex;
@@ -91,6 +93,18 @@ public class RootInputVertexManager implements VertexScheduler {
     Preconditions.checkState(EnumSet.of(VertexState.INITIALIZING,
         VertexState.NEW).contains(managedVertex.getState()));
     for (Event event : events) {
+      if (event instanceof RootInputConfigureVertexTasksEvent) {
+        // No tasks should have been started yet. Checked by initial state check.
+        Preconditions.checkState(dataInformationEventSeen == false);
+        Preconditions
+            .checkState(
+                managedVertex.getTotalTasks() == -1,
+                "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
+        RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event;
+        managedVertex.setParallelism(cEvent.getNumTasks(), null);
+        managedVertex.setVertexLocationHint(new VertexLocationHint(cEvent
+            .getNumTasks(), cEvent.getTaskLocationHints()));
+      }
       if (event instanceof RootInputUpdatePayloadEvent) {
         // No tasks should have been started yet. Checked by initial state check.
         Preconditions.checkState(dataInformationEventSeen == false);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5f54ad5..8433477 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -412,6 +412,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private RootInputInitializerRunner rootInputInitializer;
   
   private VertexScheduler vertexScheduler;
+  
+  private boolean parallelismSet = false;
 
   private VertexOutputCommitter committer;
   private AtomicBoolean committed = new AtomicBoolean(false);
@@ -748,6 +750,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       Map<Vertex, EdgeManager> sourceEdgeManagers) {
     writeLock.lock();
     try {
+      Preconditions.checkState(parallelismSet == false,
+          "Parallelism can only be set dynamically once per vertex");
+      parallelismSet = true;
+      
+      // Input initializer expected to set parallelism.
+      if (numTasks == -1) {
+        Preconditions
+            .checkArgument(sourceEdgeManagers == null,
+                "SourceEdge managers cannot be set when determining initial parallelism");
+        this.numTasks = parallelism;
+        this.createTasks();
+        LOG.info("Parallelism set to : " + this.numTasks);
+        // Pending task event management, which follows, is not required.
+        // Vertex event buffering is happening elsewhere - while in the Vertex
+        // INITIALIZING state.
+        return;
+      }
+      
       if (parallelism >= numTasks) {
         // not that hard to support perhaps. but checking right now since there
         // is no use case for it and checking may catch other bugs.
@@ -832,6 +852,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     
   }
 
+  public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
+    writeLock.lock();
+    try {
+      this.vertexLocationHint = vertexLocationHint;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   @Override
   /**
    * The only entry point to change the Vertex.
@@ -1092,34 +1121,34 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // no code, for now
   }
   
-  private void createTasks(VertexImpl vertex) {
-    Configuration conf = vertex.conf;
+  private void createTasks() {
+    Configuration conf = this.conf;
     boolean useNullLocationHint = true;
-    if (vertex.vertexLocationHint != null
-        && vertex.vertexLocationHint.getTaskLocationHints() != null
-        && vertex.vertexLocationHint.getTaskLocationHints().size() ==
-            vertex.numTasks) {
+    if (this.vertexLocationHint != null
+        && this.vertexLocationHint.getTaskLocationHints() != null
+        && this.vertexLocationHint.getTaskLocationHints().size() ==
+            this.numTasks) {
       useNullLocationHint = false;
     }
-    for (int i=0; i < vertex.numTasks; ++i) {
+    for (int i=0; i < this.numTasks; ++i) {
       TaskLocationHint locHint = null;
       if (!useNullLocationHint) {
-        locHint = vertex.vertexLocationHint.getTaskLocationHints().get(i);
+        locHint = this.vertexLocationHint.getTaskLocationHints().get(i);
       }
       TaskImpl task =
-          new TaskImpl(vertex.getVertexId(), i,
-              vertex.eventHandler,
+          new TaskImpl(this.getVertexId(), i,
+              this.eventHandler,
               conf,
-              vertex.taskAttemptListener,
-              vertex.clock,
-              vertex.taskHeartbeatHandler,
-              vertex.appContext,
-              vertex.targetVertices.isEmpty(),
-              locHint, vertex.taskResource,
-              vertex.containerContext);
-      vertex.addTask(task);
+              this.taskAttemptListener,
+              this.clock,
+              this.taskHeartbeatHandler,
+              this.appContext,
+              this.targetVertices.isEmpty(),
+              locHint, this.taskResource,
+              this.containerContext);
+      this.addTask(task);
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Created task for vertex " + vertex.getVertexId() + ": " +
+        LOG.debug("Created task for vertex " + this.getVertexId() + ": " +
             task.getTaskId());
       }
     }
@@ -1207,7 +1236,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (vertex.numTasks == -1) {
         LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers to set #tasks for the vertex");
       } else {
-        vertex.createTasks(vertex);
+        vertex.createTasks();
       }
 
       if (vertex.inputsWithInitializers != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index be4534b..6067190 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -64,13 +65,16 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.processor.FilterByWordInputProcessor;
 import org.apache.tez.processor.FilterByWordOutputProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 
@@ -80,13 +84,28 @@ public class FilterLinesByWord {
 
   public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
 
+  private static void printUsage() {
+    System.err.println("Usage filtelinesrbyword <in> <out> <filter_word> [-generateSplitsInClient true/<false>]");
+  }
 
   public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
     Configuration conf = new Configuration();
     String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 
+    boolean generateSplitsInClient = false;
+    
+    SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
+    try {
+      generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
+      otherArgs = splitCmdLineParser.getRemainingArgs();
+    } catch (ParseException e1) {
+      System.err.println("Invalid options");
+      printUsage();
+      System.exit(2);
+    }
+
     if (otherArgs.length != 3) {
-      System.err.println("Usage filtelinesrbyword <in> <out> <filter_word>");
+      printUsage();
       System.exit(2);
     }
 
@@ -140,7 +159,10 @@ public class FilterLinesByWord {
     stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, TextLongPair.class.getName());
     stage1Conf.set(FILTER_PARAM_NAME, filterWord);
 
-    InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
+    InputSplitInfo inputSplitInfo = null;
+    if (generateSplitsInClient) {
+      inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
+    }
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
 
 
@@ -155,24 +177,33 @@ public class FilterLinesByWord {
     MRHelpers.doJobClientMagic(stage1Conf);
     MRHelpers.doJobClientMagic(stage2Conf);
 
+    byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
+    int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
-        FilterByWordInputProcessor.class.getName()).setUserPayload(MRHelpers
-        .createUserPayloadFromConf(stage1Conf)), inputSplitInfo.getNumTasks(),
-        MRHelpers.getMapResource(stage1Conf));
-    stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf)).setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
-    Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
-    stage1LocalResources.putAll(commonLocalResources);
-    MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
-    stage1Vertex.setTaskLocalResources(stage1LocalResources);
+        FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
+        stage1NumTasks, MRHelpers.getMapResource(stage1Conf));
+    stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
+    if (generateSplitsInClient) {
+      stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+      Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+      stage1LocalResources.putAll(commonLocalResources);
+      MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
+      stage1Vertex.setTaskLocalResources(stage1LocalResources);
+    } else {
+      stage1Vertex.setTaskLocalResources(commonLocalResources);
+    }
     Map<String, String> stage1Env = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(stage1Conf, stage1Env, true);
     stage1Vertex.setTaskEnvironment(stage1Env);
     
     // Configure the Input for stage1
+    Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
+        : MRInputAMSplitGenerator.class;
     stage1Vertex.addInput("MRInput",
         new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(MRHelpers.createMRInputPayload(stage1Conf, null)), null);
+            .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
+        initializerClazz);
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index a1fb14a..b386811 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -77,6 +78,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
@@ -421,7 +423,8 @@ public class MRRSleepJob extends Configured implements Tool {
       int numMapper, int numReducer, int iReduceStagesCount,
       int numIReducer, long mapSleepTime, int mapSleepCount,
       long reduceSleepTime, int reduceSleepCount,
-      long iReduceSleepTime, int iReduceSleepCount, boolean writeSplitsToDFS)
+      long iReduceSleepTime, int iReduceSleepCount, boolean writeSplitsToDFS,
+      boolean generateSplitsInAM)
       throws IOException, YarnException {
 
 
@@ -518,24 +521,26 @@ public class MRRSleepJob extends Configured implements Tool {
     }
 
     InputSplitInfo inputSplitInfo = null;
-    if (writeSplitsToDFS) {
-      LOG.info("Writing splits to DFS");
-      try {
-        inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
-            remoteStagingDir);
-      } catch (InterruptedException e) {
-        throw new TezUncheckedException("Could not generate input splits", e);
-      } catch (ClassNotFoundException e) {
-        throw new TezUncheckedException("Failed to generate input splits", e);
-      }
-    } else {
-      try {
-        LOG.info("Creating in-mem splits");
-        inputSplitInfo = MRHelpers.generateInputSplitsToMem(mapStageConf);
-      } catch (ClassNotFoundException e) {
-        throw new TezUncheckedException("Could not generate input splits", e);
-      } catch (InterruptedException e) {
-        throw new TezUncheckedException("Could not generate input splits", e);
+    if (!generateSplitsInAM) {
+      if (writeSplitsToDFS) {
+        LOG.info("Writing splits to DFS");
+        try {
+          inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
+              remoteStagingDir);
+        } catch (InterruptedException e) {
+          throw new TezUncheckedException("Could not generate input splits", e);
+        } catch (ClassNotFoundException e) {
+          throw new TezUncheckedException("Failed to generate input splits", e);
+        }
+      } else {
+        try {
+          LOG.info("Creating in-mem splits");
+          inputSplitInfo = MRHelpers.generateInputSplitsToMem(mapStageConf);
+        } catch (ClassNotFoundException e) {
+          throw new TezUncheckedException("Could not generate input splits", e);
+        } catch (InterruptedException e) {
+          throw new TezUncheckedException("Could not generate input splits", e);
+        }
       }
     }
 
@@ -564,18 +569,21 @@ public class MRRSleepJob extends Configured implements Tool {
 
     
     byte[] mapInputPayload = null;
-    if (writeSplitsToDFS) {
-      mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
+    byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+    if (writeSplitsToDFS || generateSplitsInAM) {
+      mapInputPayload = MRHelpers.createMRInputPayload(mapUserPayload, null);
     } else {
-      mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, inputSplitInfo.getSplitsProto());
+      mapInputPayload = MRHelpers.createMRInputPayload(mapUserPayload, inputSplitInfo.getSplitsProto());
     }
+    int numTasks = generateSplitsInAM ? -1 : numMapper;
     
-    byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(mapUserPayload),
-        numMapper, MRHelpers.getMapResource(mapStageConf));
+        numTasks, MRHelpers.getMapResource(mapStageConf));
     mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
-    mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+    if (!generateSplitsInAM) {
+      mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+    }
     
     if (writeSplitsToDFS) {
       Map<String, LocalResource> mapLocalResources = new HashMap<String, LocalResource>();
@@ -590,10 +598,14 @@ public class MRRSleepJob extends Configured implements Tool {
     Map<String, String> mapEnv = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
     mapVertex.setTaskEnvironment(mapEnv);
-    if (writeSplitsToDFS) {
-      MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
+    if (generateSplitsInAM) {
+      MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputAMSplitGenerator.class);
     } else {
-      MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputSplitDistributor.class);
+      if (writeSplitsToDFS) {
+        MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
+      } else {
+        MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputSplitDistributor.class);
+      }
     }
     vertices.add(mapVertex);
 
@@ -720,6 +732,7 @@ public class MRRSleepJob extends Configured implements Tool {
           " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
           " [-irt intermediateReduceSleepTime]" +
           " [-recordt recordSleepTime (msec)]" +
+          " [-generateSplitsInAM (fale)/true]" +
           " [-writeSplitsToDfs (false)/true]");
       ToolRunner.printGenericCommandUsage(System.err);
       return 2;
@@ -731,6 +744,8 @@ public class MRRSleepJob extends Configured implements Tool {
     int mapSleepCount = 1, reduceSleepCount = 1, iReduceSleepCount = 1;
     int iReduceStagesCount = 1;
     boolean writeSplitsToDfs = false;
+    boolean generateSplitsInAM = false;
+    boolean splitsOptionFound = false;
 
     for(int i=0; i < args.length; i++ ) {
       if(args[i].equals("-m")) {
@@ -757,7 +772,19 @@ public class MRRSleepJob extends Configured implements Tool {
       else if (args[i].equals("-recordt")) {
         recSleepTime = Long.parseLong(args[++i]);
       }
+      else if (args[i].equals("-generateSplitsInAM")) {
+        if (splitsOptionFound) {
+          throw new RuntimeException("Cannot use both -generateSplitsInAm and -writeSplitsToDfs together");
+        }
+        splitsOptionFound = true;
+        generateSplitsInAM = Boolean.parseBoolean(args[++i]);
+        
+      }
       else if (args[i].equals("-writeSplitsToDfs")) {
+        if (splitsOptionFound) {
+          throw new RuntimeException("Cannot use both -generateSplitsInAm and -writeSplitsToDfs together");
+        }
+        splitsOptionFound = true;
         writeSplitsToDfs = Boolean.parseBoolean(args[++i]);
       }
     }
@@ -794,7 +821,7 @@ public class MRRSleepJob extends Configured implements Tool {
     DAG dag = createDAG(remoteFs, conf, appId, remoteStagingDir,
         numMapper, numReducer, iReduceStagesCount, numIReducer,
         mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
-        iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs);
+        iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM);
 
     conf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
         MRHelpers.getMRAMJavaOpts(conf));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 196e3d4..1ae1d57 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
 
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -59,6 +60,9 @@ import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -66,17 +70,17 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
@@ -145,7 +149,8 @@ public class OrderedWordCount {
 
   private static DAG createDAG(FileSystem fs, Configuration conf,
       Map<String, LocalResource> commonLocalResources, Path stagingDir,
-      int dagIndex, String inputPath, String outputPath) throws Exception {
+      int dagIndex, String inputPath, String outputPath,
+      boolean generateSplitsInClient) throws Exception {
 
     Configuration mapStageConf = new JobConf(conf);
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
@@ -159,9 +164,11 @@ public class OrderedWordCount {
     mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
     mapStageConf.setBoolean("mapred.mapper.new-api", true);
 
-    InputSplitInfo inputSplitInfo =
-        MRHelpers.generateInputSplits(mapStageConf, stagingDir);
-    mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks());
+    InputSplitInfo inputSplitInfo = null;
+    if (generateSplitsInClient) {
+      inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf, stagingDir);
+      mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks());
+    }
 
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
         null);
@@ -202,22 +209,30 @@ public class OrderedWordCount {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    byte[] mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
+    byte[] mapInputPayload = MRHelpers.createMRInputPayload(mapPayload, null);
+    int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(mapPayload),
-        inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(mapStageConf));
+        numMaps, MRHelpers.getMapResource(mapStageConf));
     mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
-    mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
-    Map<String, LocalResource> mapLocalResources =
-        new HashMap<String, LocalResource>();
-    mapLocalResources.putAll(commonLocalResources);
-    MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo,
-        mapLocalResources);
-    mapVertex.setTaskLocalResources(mapLocalResources);
+    if (generateSplitsInClient) {
+      mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+      Map<String, LocalResource> mapLocalResources =
+          new HashMap<String, LocalResource>();
+      mapLocalResources.putAll(commonLocalResources);
+      MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo,
+          mapLocalResources);
+      mapVertex.setTaskLocalResources(mapLocalResources);
+    } else {
+      mapVertex.setTaskLocalResources(commonLocalResources);
+    }
+
     Map<String, String> mapEnv = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
     mapVertex.setTaskEnvironment(mapEnv);
-    MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
+    Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
+        : MRInputAMSplitGenerator.class;
+    MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz);
     vertices.add(mapVertex);
 
     Vertex ivertex = new Vertex("ivertex1", new ProcessorDescriptor(
@@ -263,19 +278,38 @@ public class OrderedWordCount {
     return dag;
   }
 
+  private static void printUsage() {
+    System.err.println("Usage: orderedwordcount <in> <out> [-generateSplitsInClient true/<false>]");
+    System.err.println("Usage (In Session Mode):"
+        + " orderedwordcount <in1> <out1> ... <inN> <outN> [-generateSplitsInClient true/<false>]");
+  }
+  
+  
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    
+    boolean generateSplitsInClient = false;
+    
+    SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
+    try {
+      generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
+      otherArgs = splitCmdLineParser.getRemainingArgs();
+    } catch (ParseException e1) {
+      System.err.println("Invalid options");
+      printUsage();
+      System.exit(2);
+    }
+
     boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
     long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
         * 1000;
     if (((otherArgs.length%2) != 0)
         || (!useTezSession && otherArgs.length != 2)) {
-      System.err.println("Usage: wordcount <in> <out>");
-      System.err.println("Usage (In Session Mode):"
-          + " wordcount <in1> <out1> ... <inN> <outN>");
+      printUsage();
       System.exit(2);
     }
+
     List<String> inputPaths = new ArrayList<String>();
     List<String> outputPaths = new ArrayList<String>();
 
@@ -365,7 +399,7 @@ public class OrderedWordCount {
             + ", outputPath=" + outputPath);
 
         DAG dag = createDAG(fs, conf, commonLocalResources, stagingDir,
-            dagIndex, inputPath, outputPath);
+            dagIndex, inputPath, outputPath, generateSplitsInClient);
 
         DAGClient dagClient;
         if (useTezSession) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
new file mode 100644
index 0000000..93ec860
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.helpers;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+import com.google.common.base.Preconditions;
+
+public class SplitsInClientOptionParser {
+
+  private CommandLine cmdLine;
+  private String[] otherArgs;
+
+  private boolean parsed = false;
+
+  public SplitsInClientOptionParser() {
+
+  }
+
+  public String[] getRemainingArgs() {
+    Preconditions.checkState(parsed,
+        "Cannot get remaining args without parsing");
+    return otherArgs;
+  }
+
+  @SuppressWarnings("static-access")
+  public boolean parse(String[] args, boolean defaultVal) throws ParseException {
+    Preconditions.checkState(parsed == false,
+        "Craete a new instance for different option sets");
+    parsed = true;
+    Options opts = new Options();
+    Option opt = OptionBuilder
+        .withArgName("splits_in_client")
+        .hasArg()
+        .withDescription(
+            "specify whether splits should be generated in the client")
+        .create("generateSplitsInClient");
+    opts.addOption(opt);
+    CommandLineParser parser = new GnuParser();
+
+    cmdLine = parser.parse(opts, args, false);
+    if (cmdLine.hasOption("generateSplitsInClient")) {
+      defaultVal = Boolean.parseBoolean(cmdLine
+          .getOptionValue("generateSplitsInClient"));
+    }
+    otherArgs = cmdLine.getArgs();
+    return defaultVal;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 2b001b4..3822b4d 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.client.TezSessionStatus;
@@ -70,6 +71,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.MRRSleepJob;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
@@ -82,6 +84,7 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 import org.junit.AfterClass;
@@ -184,7 +187,7 @@ public class TestMRRJobsDAGApi {
   @Test(timeout = 60000)
   public void testMRRSleepJobDagSubmit() throws IOException,
   InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(false, false, false);
+    State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
 
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
     // TODO Add additional checks for tracking URL etc. - once it's exposed by
@@ -195,7 +198,7 @@ public class TestMRRJobsDAGApi {
   @Test(timeout = 60000)
   public void testMRRSleepJobDagSubmitAndKill() throws IOException,
   InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(false, true, false);
+    State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
 
     Assert.assertEquals(DAGStatus.State.KILLED, finalState);
     // TODO Add additional checks for tracking URL etc. - once it's exposed by
@@ -206,7 +209,7 @@ public class TestMRRJobsDAGApi {
   @Test(timeout = 60000)
   public void testMRRSleepJobViaSession() throws IOException,
   InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(true, false, false);
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
 
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
   }
@@ -248,12 +251,12 @@ public class TestMRRJobsDAGApi {
         tezSession.getSessionStatus());
 
     State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
-        tezSession);
+        tezSession, false);
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
     Assert.assertEquals(TezSessionStatus.READY,
         tezSession.getSessionStatus());
     finalState = testMRRSleepJobDagSubmitCore(true, false, false,
-        tezSession);
+        tezSession, false);
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
     Assert.assertEquals(TezSessionStatus.READY,
         tezSession.getSessionStatus());
@@ -290,7 +293,7 @@ public class TestMRRJobsDAGApi {
   @Test(timeout = 60000)
   public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
   InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(true, true, false);
+    State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
 
     Assert.assertEquals(DAGStatus.State.KILLED, finalState);
     // TODO Add additional checks for tracking URL etc. - once it's exposed by
@@ -301,17 +304,24 @@ public class TestMRRJobsDAGApi {
   @Test(timeout = 60000)
   public void testTezSessionShutdown() throws IOException,
   InterruptedException, TezException, ClassNotFoundException, YarnException {
-    testMRRSleepJobDagSubmitCore(true, false, true);
+    testMRRSleepJobDagSubmitCore(true, false, true, false);
+  }
+
+  @Test(timeout = 60000)
+  public void testAMSplitGeneration() throws IOException, InterruptedException,
+      TezException, ClassNotFoundException, YarnException {
+    testMRRSleepJobDagSubmitCore(true, false, false, true);
   }
 
   public State testMRRSleepJobDagSubmitCore(
       boolean dagViaRPC,
       boolean killDagWhileRunning,
-      boolean closeSessionBeforeSubmit) throws IOException,
+      boolean closeSessionBeforeSubmit,
+      boolean genSplitsInAM) throws IOException,
       InterruptedException, TezException, ClassNotFoundException,
       YarnException {
     return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
-        closeSessionBeforeSubmit, null);
+        closeSessionBeforeSubmit, null, genSplitsInAM);
   }
 
   private Map<String, String> createCommonEnv() {
@@ -326,7 +336,8 @@ public class TestMRRJobsDAGApi {
       boolean dagViaRPC,
       boolean killDagWhileRunning,
       boolean closeSessionBeforeSubmit,
-      TezSession reUseTezSession) throws IOException,
+      TezSession reUseTezSession,
+      boolean genSplitsInAM) throws IOException,
       InterruptedException, TezException, ClassNotFoundException,
       YarnException {
     LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
@@ -395,18 +406,25 @@ public class TestMRRJobsDAGApi {
 
     Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
         .valueOf(new Random().nextInt(100000))));
-    InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+    InputSplitInfo inputSplitInfo = null;
+    if (!genSplitsInAM) {
+      inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
         remoteStagingDir);
+    }
 
     byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
-    byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Conf, null);
+    byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
     byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
     
     DAG dag = new DAG("testMRRSleepJobDagSubmit");
+    int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
+    Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
+        : null;
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(stage1Payload),
-        inputSplitInfo.getNumTasks(), Resource.newInstance(256, 1));
-    MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, null);
+        stage1NumTasks, Resource.newInstance(256, 1));
+    MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
     Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName()).setUserPayload(
         MRHelpers.createUserPayloadFromConf(stage2Conf)),
@@ -428,21 +446,26 @@ public class TestMRRJobsDAGApi {
 
     Map<String, String> commonEnv = createCommonEnv();
 
-    // TODO Use utility method post TEZ-205.
-    Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
-    stage1LocalResources.put(
-        inputSplitInfo.getSplitsFile().getName(),
-        createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
-            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-    stage1LocalResources.put(
-        inputSplitInfo.getSplitsMetaInfoFile().getName(),
-        createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
-            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-    stage1LocalResources.putAll(commonLocalResources);
+    if (!genSplitsInAM) {
+      // TODO Use utility method post TEZ-205.
+      Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+      stage1LocalResources.put(
+          inputSplitInfo.getSplitsFile().getName(),
+          createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      stage1LocalResources.put(
+          inputSplitInfo.getSplitsMetaInfoFile().getName(),
+          createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      stage1LocalResources.putAll(commonLocalResources);
+
+      stage1Vertex.setTaskLocalResources(stage1LocalResources);
+      stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+    } else {
+      stage1Vertex.setTaskLocalResources(commonLocalResources);
+    }
 
     stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
-    stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
-    stage1Vertex.setTaskLocalResources(stage1LocalResources);
     stage1Vertex.setTaskEnvironment(commonEnv);
 
     // TODO env, resources

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
new file mode 100644
index 0000000..cb92dbb
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.common;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+public class MRInputAMSplitGenerator implements TezRootInputInitializer {
+
+  private static final Log LOG = LogFactory
+      .getLog(MRInputAMSplitGenerator.class);
+
+  public MRInputAMSplitGenerator() {
+  }
+
+  @Override
+  public List<Event> initialize(TezRootInputInitializerContext rootInputContext)
+      throws Exception {
+    Stopwatch sw = null;
+    if (LOG.isDebugEnabled()) {
+      sw = new Stopwatch().start();
+    }
+    MRInputUserPayloadProto userPayloadProto = MRHelpers
+        .parseMRInputPayload(rootInputContext.getUserPayload());
+    if (LOG.isDebugEnabled()) {
+      sw.stop();
+      LOG.debug("Time to parse MRInput payload into prot: "
+          + sw.elapsedMillis());
+    }
+    if (LOG.isDebugEnabled()) {
+      sw.reset().start();
+    }
+    Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+        .getConfigurationBytes());
+    if (LOG.isDebugEnabled()) {
+      sw.stop();
+      LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      sw.reset().start();
+    }
+    InputSplitInfoMem inputSplitInfo = MRHelpers.generateInputSplitsToMem(conf);
+    if (LOG.isDebugEnabled()) {
+      sw.stop();
+      LOG.debug("Time to create splits to mem: " + sw.elapsedMillis());
+    }
+
+    List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo
+        .getNumTasks() + 1);
+    RootInputConfigureVertexTasksEvent configureVertexEvent = new RootInputConfigureVertexTasksEvent(
+        inputSplitInfo.getNumTasks(), inputSplitInfo.getTaskLocationHints());
+    events.add(configureVertexEvent);
+
+    MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
+
+    int count = 0;
+    for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
+      // Unnecessary array copy, can be avoided by using ByteBuffer instead of a
+      // raw array.
+      RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
+          count++, mrSplit.toByteArray());
+      events.add(diEvent);
+    }
+    return events;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index c35a450..b6bdcfc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -24,15 +24,16 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 
 public class MRInputSplitDistributor implements TezRootInputInitializer {
@@ -43,26 +44,35 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
   public MRInputSplitDistributor() {
   }
 
-  private MRSplitsProto splitProto;
+  private MRSplitsProto splitsProto;
 
   @Override
   public List<Event> initialize(TezRootInputInitializerContext rootInputContext)
       throws IOException {
+    Stopwatch sw = null;
+    if (LOG.isDebugEnabled()) {
+      sw = new Stopwatch().start();
+    }
+    MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
+    if (LOG.isDebugEnabled()) {
+      sw.stop();
+      LOG.debug("Time to parse MRInput payload into prot: "
+          + sw.elapsedMillis());  
+    }
     
-    MRUserPayloadProto userPayloadProto = MRHelpers.parseMRPayload(rootInputContext.getUserPayload());
 
-    this.splitProto = userPayloadProto.getSplits();
+    this.splitsProto = userPayloadProto.getSplits();
     
-    MRUserPayloadProto.Builder updatedPayloadBuilder = MRUserPayloadProto.newBuilder(userPayloadProto);
+    MRInputUserPayloadProto.Builder updatedPayloadBuilder = MRInputUserPayloadProto.newBuilder(userPayloadProto);
     updatedPayloadBuilder.clearSplits();
 
-    List<Event> events = Lists.newArrayListWithCapacity(this.splitProto.getSplitsCount() + 1);
+    List<Event> events = Lists.newArrayListWithCapacity(this.splitsProto.getSplitsCount() + 1);
     RootInputUpdatePayloadEvent updatePayloadEvent = new RootInputUpdatePayloadEvent(
         updatedPayloadBuilder.build().toByteArray());
 
     events.add(updatePayloadEvent);
     int count = 0;
-    for (MRSplitProto mrSplit : this.splitProto.getSplitsList()) {
+    for (MRSplitProto mrSplit : this.splitsProto.getSplitsList()) {
       // Unnecessary array copy, can be avoided by using ByteBuffer instead of a
       // raw array.
       RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 5271069..cc61a9a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -72,9 +72,9 @@ import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 
 import com.google.common.base.Preconditions;
@@ -618,21 +618,38 @@ public class MRHelpers {
     return TezUtils.createConfFromByteString(bs);
   }
 
+  public static byte[] createMRInputPayload(byte[] configurationBytes,
+      MRSplitsProto mrSplitsProto) {
+    Preconditions.checkArgument(configurationBytes != null,
+        "Configuration bytes must be specified");
+    MRInputUserPayloadProto.Builder userPayloadBuilder = MRInputUserPayloadProto
+        .newBuilder();
+    userPayloadBuilder.setConfigurationBytes(ByteString
+        .copyFrom(configurationBytes));
+    if (mrSplitsProto != null) {
+      userPayloadBuilder.setSplits(mrSplitsProto);
+    }
+    return userPayloadBuilder.build().toByteArray();
+  }
+  
   public static byte[] createMRInputPayload(Configuration conf,
-      MRSplitsProto mrSplitProto) throws IOException {
-    MRUserPayloadProto.Builder userPayloadBuilder = MRUserPayloadProto
+      MRSplitsProto mrSplitsProto) throws IOException {
+    Preconditions
+        .checkArgument(conf != null, "Configuration must be specified");
+    MRInputUserPayloadProto.Builder userPayloadBuilder = MRInputUserPayloadProto
         .newBuilder();
     userPayloadBuilder.setConfigurationBytes(createByteStringFromConf(conf));
-    if (mrSplitProto != null) {
-      userPayloadBuilder.setSplits(mrSplitProto);
+    if (mrSplitsProto != null) {
+      userPayloadBuilder.setSplits(mrSplitsProto);
     }
     // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer would be
     // more efficient.
     return userPayloadBuilder.build().toByteArray();
   }
 
-  public static MRUserPayloadProto parseMRPayload(byte[] bytes) throws IOException {
-    return MRUserPayloadProto.parseFrom(bytes);
+  public static MRInputUserPayloadProto parseMRInputPayload(byte[] bytes)
+      throws IOException {
+    return MRInputUserPayloadProto.parseFrom(bytes);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 6951261..65eca5f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -657,7 +657,7 @@ public interface MRJobConfig {
 
   public static final String MR_TEZ_SPLITS_VIA_EVENTS = MR_TEZ_PREFIX + "splits.via.events";
   public static final boolean MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT = true;
-  
+
   // Stage specific properties
   // Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>
   // where suffix is one of MRR_INTERMEDIATE_STAGE_* fields defined below.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index d8f3709..1de90a0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -52,8 +52,8 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -110,7 +110,7 @@ public class MRInput implements LogicalInput {
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
-    MRUserPayloadProto mrUserPayload = MRHelpers.parseMRPayload(inputContext.getUserPayload());
+    MRInputUserPayloadProto mrUserPayload = MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
     Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
         "All split information not expected in MRInput");
     Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto b/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
index ca9f5b9..bc6d074 100644
--- a/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
+++ b/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
@@ -34,7 +34,7 @@ message MRSplitProto {
   optional bytes split_bytes = 2;
 }
 
-message MRUserPayloadProto {
+message MRInputUserPayloadProto {
   optional bytes configuration_bytes = 1;
   optional MRSplitsProto splits = 2;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad5666a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 63e62ae..87df0b0 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -406,7 +406,7 @@ public class YARNRunner implements ClientProtocol {
         setUserPayload(vertexUserPayload),
         numTasks, taskResource);
     if (isMap) {
-      byte[] mapInputPayload = MRHelpers.createMRInputPayload(stageConf, null);
+      byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
       MRHelpers.addMRInput(vertex, mapInputPayload, null);
     }
     // Map only jobs.


Mime
View raw message