tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [2/2] git commit: TEZ-779. Make Tez grouping splits logic possible outside InputFormat (bikas)
Date Sun, 23 Feb 2014 04:00:17 GMT
TEZ-779. Make Tez grouping splits logic possible outside InputFormat (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/25cb8930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/25cb8930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/25cb8930

Branch: refs/heads/master
Commit: 25cb893052ac349943842869db4609c41683a3f6
Parents: e17e750
Author: Bikas Saha <bikas@apache.org>
Authored: Sat Feb 22 20:00:03 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Sat Feb 22 20:00:03 2014 -0800

----------------------------------------------------------------------
 .../tez/dag/api/VertexManagerPluginContext.java |  21 +
 .../api/TezRootInputInitializerContext.java     |  42 ++
 .../dag/app/dag/RootInputInitializerRunner.java |  26 +-
 .../TezRootInputInitializerContextImpl.java     |  25 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  30 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |  16 +
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   4 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  12 +-
 .../split/TezGroupedSplitsInputFormat.java      | 403 +-----------------
 .../mapred/split/TezMapredSplitsGrouper.java    | 425 +++++++++++++++++++
 .../split/TezGroupedSplitsInputFormat.java      | 343 +--------------
 .../split/TezMapReduceSplitsGrouper.java        | 420 ++++++++++++++++++
 .../common/MRInputAMSplitGenerator.java         |  20 +-
 13 files changed, 1012 insertions(+), 775 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 0e21a92..a281b90 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
 /**
@@ -57,6 +58,26 @@ public interface VertexManagerPluginContext {
   public int getVertexNumTasks(String vertexName);
   
   /**
+   * Get the resource allocated to a task of this vertex
+   * @return Resource
+   */
+  Resource getVertexTaskResource();
+  
+  /**
+   * Get the total resource allocated to this vertex. If the DAG is running in 
+   * a busy cluster then it may have no resources available dedicated to it. The
+   * DAG may divide its available resource among member vertices.
+   * @return Resource
+   */
+  Resource getTotalAVailableResource();
+  
+  /**
+   * Get the number of nodes in the cluster
+   * @return Number of nodes
+   */
+  int getNumClusterNodes();
+  
+  /**
    * Set the new parallelism (number of tasks) of this vertex.
    * Map of source (input) vertices and edge managers to change the event routing
    * between the source tasks and the new destination tasks.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
index 7c075e6..3dea536 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
@@ -19,16 +19,58 @@
 package org.apache.tez.runtime.api;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 
 public interface TezRootInputInitializerContext {
 
+  /**
+   * Get the YARN application id given to the Tez Application Master
+   * @return Application id
+   */
   ApplicationId getApplicationId();
   
+  /**
+   * Get the name of the DAG
+   * @return DAG name
+   */
   String getDAGName();
   
+  /**
+   * Get the name of the input
+   * @return Input name
+   */
   String getInputName();
 
+  /**
+   * Get the user payload
+   * @return User payload
+   */
   byte[] getUserPayload();
   
+  /**
+   * Get the number of tasks in this vertex. Maybe -1 if the vertex has not been
+   * initialized with a pre-determined number of tasks.
+   * @return number of tasks
+   */
   int getNumTasks();
+  
+  /**
+   * Get the resource allocated to a task of this vertex
+   * @return Resource
+   */
+  Resource getVertexTaskResource();
+  
+  /**
+   * Get the total resource allocated to this vertex. If the DAG is running in 
+   * a busy cluster then it may have no resources available dedicated to it. The
+   * DAG may divide its resources among member vertices.
+   * @return Resource
+   */
+  Resource getTotalAvailableResource();
+  
+  /**
+   * Get the number of nodes in the cluster
+   * @return Number of nodes
+   */
+  int getNumClusterNodes();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
index 3f0d24d..7d3cd28 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
@@ -56,30 +57,37 @@ public class RootInputInitializerRunner {
   private final String vertexName;
   private final TezVertexID vertexID;
   private final int numTasks;
+  private final Resource vertexTaskResource;
+  private final Resource totalResource;
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
   private volatile boolean isStopped = false;
   private final UserGroupInformation dagUgi;
+  private final int numClusterNodes;
 
   @SuppressWarnings("rawtypes")
   public RootInputInitializerRunner(String dagName, String vertexName,
-      TezVertexID vertexID, EventHandler eventHandler, UserGroupInformation dagUgi, int numTasks) {
+      TezVertexID vertexID, EventHandler eventHandler, UserGroupInformation dagUgi,
+      Resource vertexTaskResource, Resource totalResource, int numTasks, int numNodes) {
     this.dagName = dagName;
     this.vertexName = vertexName;
     this.vertexID = vertexID;
     this.eventHandler = eventHandler;
+    this.vertexTaskResource = vertexTaskResource;
+    this.totalResource = totalResource;
     this.numTasks = numTasks;
     this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("InputInitializer [" + this.vertexName + "] #%d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     this.dagUgi = dagUgi;
+    this.numClusterNodes = numNodes;
   }
-
+  
   public void runInputInitializers(List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) {
     for (RootInputLeafOutputDescriptor<InputDescriptor> input : inputs) {
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(input, vertexID, dagName,
-              vertexName, dagUgi, numTasks));
+              vertexName, dagUgi, numTasks, numClusterNodes, vertexTaskResource, totalResource));
       Futures.addCallback(future, createInputInitializerCallback(input.getEntityName()));
     }
   }
@@ -106,16 +114,23 @@ public class RootInputInitializerRunner {
     private final String dagName;
     private final String vertexName;
     private final int numTasks;
+    private final Resource vertexTaskResource;
+    private final Resource totalResource;
     private final UserGroupInformation ugi;
+    private final int numClusterNodes;
 
     public InputInitializerCallable(RootInputLeafOutputDescriptor<InputDescriptor> input,
-        TezVertexID vertexID, String dagName, String vertexName, UserGroupInformation ugi, int numTasks) {
+        TezVertexID vertexID, String dagName, String vertexName, UserGroupInformation ugi, 
+        int numTasks, int numClusterNodes, Resource vertexTaskResource, Resource totalResource) {
       this.input = input;
       this.vertexID = vertexID;
       this.dagName = dagName;
       this.vertexName = vertexName;
       this.numTasks = numTasks;
+      this.vertexTaskResource = vertexTaskResource;
+      this.totalResource = totalResource;
       this.ugi = ugi;
+      this.numClusterNodes = numClusterNodes;
     }
 
     @Override
@@ -125,7 +140,8 @@ public class RootInputInitializerRunner {
         public List<Event> run() throws Exception {
           TezRootInputInitializer initializer = createInitializer();
           TezRootInputInitializerContext context = new TezRootInputInitializerContextImpl(vertexID,
-              dagName, vertexName, input.getEntityName(), input.getDescriptor(), numTasks);
+              dagName, vertexName, input.getEntityName(), input.getDescriptor(), 
+              numTasks, numClusterNodes, vertexTaskResource, totalResource);
           return initializer.initialize(context);
         }
       });

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index 7e116ee..c55142f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
@@ -31,17 +32,24 @@ public class TezRootInputInitializerContextImpl implements
   private final String inputName;
   private final InputDescriptor inputDescriptor;
   private final int numTasks;
+  private final Resource vertexTaskResource;
+  private final Resource totalResource;
+  private final int numClusterNodes;
 
   // TODO Add support for counters - merged with the Vertex counters.
   
   public TezRootInputInitializerContextImpl(TezVertexID vertexID,
       String dagName, String vertexName, String inputName,
-      InputDescriptor inputDescriptor, int numTasks) {
+      InputDescriptor inputDescriptor, int numTasks, int numClusterNodes,
+      Resource vertexTaskResource, Resource totalResource) {
     this.vertexID = vertexID;
     this.dagName = dagName;
     this.inputName = inputName;
     this.inputDescriptor = inputDescriptor;
     this.numTasks = numTasks;
+    this.vertexTaskResource = vertexTaskResource;
+    this.totalResource = totalResource;
+    this.numClusterNodes = numClusterNodes;
   }
 
   @Override
@@ -69,4 +77,19 @@ public class TezRootInputInitializerContextImpl implements
     return numTasks;
   }
 
+  @Override
+  public Resource getVertexTaskResource() {
+    return vertexTaskResource;
+  }
+
+  @Override
+  public Resource getTotalAvailableResource() {
+    return totalResource;
+  }
+
+  @Override
+  public int getNumClusterNodes() {
+    return numClusterNodes;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5ec55ee..a9139ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -60,7 +60,6 @@ import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -1474,23 +1473,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
         if (vertex.inputsWithInitializers != null) {
           // Use DAGScheduler to arbitrate resources among vertices later
-          // Ask for 1.5 the number of tasks we can fit in one wave
-          int totalResource = vertex.appContext.getTaskScheduler()
-              .getTotalResources().getMemory();
-          int taskResource = vertex.getTaskResource().getMemory();
-          float waves = vertex.conf.getFloat(
-              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
-
-          int numTasks = (int)((totalResource*waves)/taskResource);
-
-          LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks
-              + " tasks. Headroom: " + totalResource + " Task Resource: "
-              + taskResource + " waves: " + waves);
-
           vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
               vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
-              vertex.eventHandler, numTasks);
+              vertex.eventHandler, -1, 
+              vertex.appContext.getTaskScheduler().getNumClusterNodes(),
+              vertex.getTaskResource(), 
+              vertex.appContext.getTaskScheduler().getTotalResources());
           List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
               .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
           for (String inputName : vertex.inputsWithInitializers) {
@@ -1523,7 +1511,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (vertex.inputsWithInitializers != null) {
           vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
               vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
-              vertex.eventHandler, vertex.getTotalTasks());
+              vertex.eventHandler, vertex.getTotalTasks(), 
+              vertex.appContext.getTaskScheduler().getNumClusterNodes(),
+              vertex.getTaskResource(), 
+              vertex.appContext.getTaskScheduler().getTotalResources());
           List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
               .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
           for (String inputName : vertex.inputsWithInitializers) {
@@ -1545,9 +1536,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @VisibleForTesting
   protected RootInputInitializerRunner createRootInputInitializerRunner(
       String dagName, String vertexName, TezVertexID vertexID,
-      EventHandler eventHandler, int numTasks) {
+      EventHandler eventHandler, int numTasks, int numNodes, 
+      Resource vertexTaskResource, Resource totalResource) {
     return new RootInputInitializerRunner(dagName, vertexName, vertexID,
-        eventHandler, dagUgi, numTasks);
+        eventHandler, dagUgi, vertexTaskResource, totalResource, numTasks, numNodes);
   }
   
   private VertexState initializeVertexInInitializingState() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index df7696b..4b110b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -159,6 +160,21 @@ public class VertexManager {
       }
       return destMeta;
     }
+
+    @Override
+    public Resource getVertexTaskResource() {
+      return managedVertex.getTaskResource();
+    }
+
+    @Override
+    public Resource getTotalAVailableResource() {
+      return appContext.getTaskScheduler().getTotalResources();
+    }
+
+    @Override
+    public int getNumClusterNodes() {
+      return appContext.getTaskScheduler().getNumClusterNodes();
+    }
   }
   
   public VertexManager(VertexManagerPlugin plugin, 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 83816b8..a597ee1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -104,6 +104,10 @@ public class TaskSchedulerEventHandler extends AbstractService
     LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
   }
 
+  public int getNumClusterNodes() {
+    return cachedNodeCount;
+  }
+  
   public Resource getAvailableResources() {
     return taskScheduler.getAvailableResources();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index c2ff32d..0bf2b8c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2203,10 +2203,11 @@ public class TestVertexImpl {
     @Override
     protected RootInputInitializerRunner createRootInputInitializerRunner(
         String dagName, String vertexName, TezVertexID vertexID,
-        EventHandler eventHandler, int numTasks) {
+        EventHandler eventHandler, int numTasks, int numNodes, 
+        Resource taskResource, Resource totalResource) {
       try {
         rootInputInitializerRunner = new RootInputInitializerRunnerControlled(dagName, vertexName, vertexID,
-            eventHandler, numTasks, dispatcher);
+            eventHandler, numTasks, dispatcher, taskResource, totalResource);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -2230,8 +2231,11 @@ public class TestVertexImpl {
 
     public RootInputInitializerRunnerControlled(String dagName,
         String vertexName, TezVertexID vertexID, EventHandler eventHandler,
-        int numTasks, DrainDispatcher dispatcher) throws IOException {
-      super(dagName, vertexName, vertexID, eventHandler, UserGroupInformation.getCurrentUser(), numTasks);
+        int numTasks, DrainDispatcher dispatcher,
+        Resource taskResource, Resource totalResource) throws IOException {
+      super(dagName, vertexName, vertexID, eventHandler, 
+          UserGroupInformation.getCurrentUser(), 
+          taskResource, totalResource, numTasks, 1);
       this.eventHandler = eventHandler;
       this.dispatcher = dispatcher;
       this.vertexID = vertexID;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index d4f16a5..6a215de 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -19,12 +19,6 @@
 package org.apache.hadoop.mapred.split;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,8 +30,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.common.base.Preconditions;
@@ -70,403 +62,12 @@ public class TezGroupedSplitsInputFormat<K, V>
     }
   }
   
-  class SplitHolder {
-    InputSplit split;
-    boolean isProcessed = false;
-    SplitHolder(InputSplit split) {
-      this.split = split;
-    }
-  }
-  
-  class LocationHolder {
-    List<SplitHolder> splits;
-    int headIndex = 0;
-    LocationHolder(int capacity) {
-      splits = new ArrayList<SplitHolder>(capacity);
-    }
-    boolean isEmpty() {
-      return (headIndex == splits.size());
-    }
-    SplitHolder getUnprocessedHeadSplit() {
-      while (!isEmpty()) {
-        SplitHolder holder = splits.get(headIndex);
-        if (!holder.isProcessed) {
-          return holder;
-        }
-        incrementHeadIndex();
-      }
-      return null;
-    }
-    void incrementHeadIndex() {
-      headIndex++;
-    }
-  }
-  
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    LOG.info("Grouping splits in Tez");
-
-    int configNumSplits = conf.getInt(TezConfiguration.TEZ_AM_GROUPING_SPLIT_COUNT, 0);
-    if (configNumSplits > 0) {
-      // always use config override if specified
-      desiredNumSplits = configNumSplits;
-      LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
-    }
-    
-    if (desiredNumSplits > 0) {
-      // get the desired num splits directly if possible
-      numSplits = desiredNumSplits;
-    }
     InputSplit[] originalSplits = wrappedInputFormat.getSplits(job, numSplits);
-    
-    if (! (configNumSplits > 0 || 
-          originalSplits == null || 
-          originalSplits.length == 0) ) {
-      // numSplits has not been overridden by config
-      // numSplits has been set at runtime
-      // there are splits generated
-      // Do sanity checks
-      long totalLength = 0;
-      for (InputSplit split : originalSplits) {
-        totalLength += split.getLength();
-      }
-
-      int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.length;
-      long lengthPerGroup = totalLength/splitCount;
-      
-      long maxLengthPerGroup = job.getLong(
-          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MAX_SIZE,
-          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
-      long minLengthPerGroup = job.getLong(
-          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MIN_SIZE,
-          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
-      if (maxLengthPerGroup < minLengthPerGroup || 
-          minLengthPerGroup <=0) {
-        throw new TezUncheckedException(
-          "Invalid max/min group lengths. Required min>0, max>=min. " +
-          " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
-      }
-      if (lengthPerGroup > maxLengthPerGroup) {
-        // splits too big to work. Need to override with max size.
-        int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
-        LOG.info("Desired splits: " + desiredNumSplits + " too small. " + 
-            " Desired splitLength: " + lengthPerGroup + 
-            " Max splitLength: " + maxLengthPerGroup +
-            " New desired splits: " + newDesiredNumSplits + 
-            " Total length: " + totalLength +
-            " Original splits: " + originalSplits.length);
-        
-        desiredNumSplits = newDesiredNumSplits;
-        if (desiredNumSplits > originalSplits.length) {
-          // too few splits were produced. See if we can produce more splits
-          LOG.info("Recalculating splits. Original splits: " + originalSplits.length);
-          originalSplits = wrappedInputFormat.getSplits(job, desiredNumSplits);
-        }
-      } else if (lengthPerGroup < minLengthPerGroup) {
-        // splits too small to work. Need to override with size.
-        int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
-        LOG.info("Desired splits: " + desiredNumSplits + " too large. " + 
-            " Desired splitLength: " + lengthPerGroup + 
-            " Min splitLength: " + minLengthPerGroup +
-            " New desired splits: " + newDesiredNumSplits + 
-            " Total length: " + totalLength +
-            " Original splits: " + originalSplits.length);
-        
-        desiredNumSplits = newDesiredNumSplits;
-        if (desiredNumSplits > originalSplits.length) {
-          // too few splits were produced. See if we can produce more splits
-          LOG.info("Recalculating splits. Original splits: " + originalSplits.length);
-          originalSplits = wrappedInputFormat.getSplits(job, desiredNumSplits);
-        }
-      }
-    }
-    
-    if (originalSplits == null) {
-      LOG.info("Null original splits");
-      return null;
-    }
-    
+    TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
     String wrappedInputFormatName = wrappedInputFormat.getClass().getName();
-    if (desiredNumSplits == 0 ||
-        originalSplits.length == 0 ||
-        desiredNumSplits >= originalSplits.length) {
-      // nothing set. so return all the splits as is
-      LOG.info("Using original number of splits: " + originalSplits.length +
-          " desired splits: " + desiredNumSplits);
-      InputSplit[] groupedSplits = new TezGroupedSplit[originalSplits.length];
-      int i=0;
-      for (InputSplit split : originalSplits) {
-        TezGroupedSplit newSplit = 
-            new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations());
-        newSplit.addSplit(split);
-        groupedSplits[i++] = newSplit;
-      }
-      return groupedSplits;
-    }
-    
-    String emptyLocation = "EmptyLocation";
-    String[] emptyLocations = {emptyLocation};
-    List<InputSplit> groupedSplitsList = new ArrayList<InputSplit>(desiredNumSplits);
-    
-    long totalLength = 0;
-    Map<String, LocationHolder> distinctLocations = new HashMap<String, LocationHolder>();
-    // go through splits and add them to locations
-    for (InputSplit split : originalSplits) {
-      totalLength += split.getLength();
-      String[] locations = split.getLocations();
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations ) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        distinctLocations.put(location, null);
-      }
-    }
-    
-    long lengthPerGroup = totalLength/desiredNumSplits;
-    int numNodeLocations = distinctLocations.size();
-    int numSplitsPerLocation = originalSplits.length/numNodeLocations;
-    int numSplitsInGroup = originalSplits.length/desiredNumSplits;
-
-    // allocation loop here so that we have a good initial size for the lists
-    for (String location : distinctLocations.keySet()) {
-      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
-    }
-    
-    Set<String> locSet = new HashSet<String>();
-    for (InputSplit split : originalSplits) {
-      locSet.clear();
-      SplitHolder splitHolder = new SplitHolder(split);
-      String[] locations = split.getLocations();
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        locSet.add(location);
-      }
-      for (String location : locSet) {
-        LocationHolder holder = distinctLocations.get(location);
-        holder.splits.add(splitHolder);
-      }
-    }
-    
-    boolean groupByLength = conf.getBoolean(
-        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH,
-        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
-    boolean groupByCount = conf.getBoolean(
-        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT,
-        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT_DEFAULT);
-    if (!(groupByLength || groupByCount)) {
-      throw new TezUncheckedException(
-          "None of the grouping parameters are true: "
-              + TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH + ", "
-              + TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT);
-    }
-    LOG.info("Desired numSplits: " + desiredNumSplits +
-        " lengthPerGroup: " + lengthPerGroup +
-        " numLocations: " + numNodeLocations +
-        " numSplitsPerLocation: " + numSplitsPerLocation +
-        " numSplitsInGroup: " + numSplitsInGroup + 
-        " totalLength: " + totalLength +
-        " numOriginalSplits: " + originalSplits.length +
-        " . Grouping by length: " + groupByLength + " count: " + groupByCount);
-    
-    // go through locations and group splits
-    int splitsProcessed = 0;
-    List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup+1);
-    Set<String> groupLocationSet = new HashSet<String>(10);
-    boolean allowSmallGroups = false;
-    boolean doingRackLocal = false;
-    int iterations = 0;
-    while (splitsProcessed < originalSplits.length) {
-      iterations++;
-      int numFullGroupsCreated = 0;
-      for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
-        group.clear();
-        groupLocationSet.clear();
-        String location = entry.getKey();
-        LocationHolder holder = entry.getValue();
-        SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
-        if (splitHolder == null) {
-          // all splits on node processed
-          continue;
-        }
-        int oldHeadIndex = holder.headIndex;
-        long groupLength = 0;
-        int groupNumSplits = 0;
-        do {
-          group.add(splitHolder);
-          groupLength += splitHolder.split.getLength();
-          groupNumSplits++;
-          holder.incrementHeadIndex();
-          splitHolder = holder.getUnprocessedHeadSplit();
-        } while(splitHolder != null  
-            && (!groupByLength || 
-                (groupLength + splitHolder.split.getLength() <= lengthPerGroup))
-            && (!groupByCount || 
-                (groupNumSplits + 1 <= numSplitsInGroup)));
-
-        if (holder.isEmpty() 
-            && !allowSmallGroups
-            && (!groupByLength || groupLength < lengthPerGroup/2)
-            && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
-          // group too small, reset it
-          holder.headIndex = oldHeadIndex;
-          continue;
-        }
-        
-        numFullGroupsCreated++;
-
-        // One split group created
-        String[] groupLocation = {location};
-        if (location == emptyLocation) {
-          groupLocation = null;
-        } else if (doingRackLocal) {
-          for (SplitHolder splitH : group) {
-            String[] locations = splitH.split.getLocations();
-            if (locations != null) {
-              for (String loc : locations) {
-                if (loc != null) {
-                  groupLocationSet.add(loc);
-                }
-              }
-            }
-          }
-          groupLocation = groupLocationSet.toArray(groupLocation);
-        }
-        TezGroupedSplit groupedSplit = 
-            new TezGroupedSplit(group.size(), wrappedInputFormatName, 
-                groupLocation, 
-                // pass rack local hint directly to AM
-                ((doingRackLocal && location != emptyLocation)?location:null));
-        for (SplitHolder groupedSplitHolder : group) {
-          groupedSplit.addSplit(groupedSplitHolder.split);
-          Preconditions.checkState(groupedSplitHolder.isProcessed == false, 
-              "Duplicates in grouping at location: " + location);
-          groupedSplitHolder.isProcessed = true;
-          splitsProcessed++;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Grouped " + group.size()
-              + " length: " + groupedSplit.getLength()
-              + " split at: " + location);
-        }
-        groupedSplitsList.add(groupedSplit);
-      }
-      
-      if (!doingRackLocal && numFullGroupsCreated < 1) {
-        // no node could create a node-local group. go rack-local
-        doingRackLocal = true;
-        // re-create locations
-        int numRemainingSplits = originalSplits.length - splitsProcessed;
-        Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits);
-        // gather remaining splits.
-        for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
-          LocationHolder locHolder = entry.getValue();
-          while (!locHolder.isEmpty()) {
-            SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit();
-            if (splitHolder != null) {
-              remainingSplits.add(splitHolder.split);
-              locHolder.incrementHeadIndex();
-            }
-          }
-        }
-        if (remainingSplits.size() != numRemainingSplits) {
-          throw new TezUncheckedException("Expected: " + numRemainingSplits 
-              + " got: " + remainingSplits.size());
-        }
-        
-        // doing all this now instead of up front because the number of remaining
-        // splits is expected to be much smaller
-        RackResolver.init(conf);
-        Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
-        Map<String, LocationHolder> rackLocations = new HashMap<String, LocationHolder>();
-        for (String location : distinctLocations.keySet()) {
-          String rack = emptyLocation;
-          if (location != emptyLocation) {
-            rack = RackResolver.resolve(location).getNetworkLocation();
-          }
-          locToRackMap.put(location, rack);
-          if (rackLocations.get(rack) == null) {
-            // splits will probably be located in all racks
-            rackLocations.put(rack, new LocationHolder(numRemainingSplits));
-          }
-        }
-        HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
-        for (InputSplit split : remainingSplits) {
-          rackSet.clear();
-          SplitHolder splitHolder = new SplitHolder(split);
-          String[] locations = split.getLocations();
-          if (locations == null || locations.length == 0) {
-            locations = emptyLocations;
-          }
-          for (String location : locations ) {
-            if ( location == null) {
-              location = emptyLocation;
-            }
-            rackSet.add(locToRackMap.get(location));
-          }
-          for (String rack : rackSet) {
-            rackLocations.get(rack).splits.add(splitHolder);
-          }
-        }
-        
-        distinctLocations.clear();
-        distinctLocations = rackLocations;
-        // adjust split length to be smaller because the data is non local
-        float rackSplitReduction = job.getFloat(
-            TezConfiguration.TEZ_AM_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
-            TezConfiguration.TEZ_AM_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
-        if (rackSplitReduction > 0) {
-          long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
-          int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);
-          if (newLengthPerGroup > 0) {
-            lengthPerGroup = newLengthPerGroup;
-          }
-          if (newNumSplitsInGroup > 0) {
-            numSplitsInGroup = newNumSplitsInGroup;
-          }
-        }
-        
-        LOG.info("Doing rack local after iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplitsList.size() +
-            " lengthPerGroup: " + lengthPerGroup +
-            " numSplitsInGroup: " + numSplitsInGroup);
-        
-        // dont do smallGroups for the first pass
-        continue;
-      }
-      
-      if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
-        // a few nodes have a lot of data or data is thinly spread across nodes
-        // so allow small groups now        
-        allowSmallGroups = true;
-        LOG.info("Allowing small groups after iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplitsList.size());
-      }
-      
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplitsList.size());
-      }
-    }
-    InputSplit[] groupedSplits = new InputSplit[groupedSplitsList.size()];
-    groupedSplitsList.toArray(groupedSplits);
-    LOG.info("Number of splits desired: " + desiredNumSplits + 
-        " created: " + groupedSplitsList.size() + 
-        " splitsProcessed: " + splitsProcessed);
-    return groupedSplits;
+    return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
new file mode 100644
index 0000000..0e1c889
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.split;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+import com.google.common.base.Preconditions;
+
+public class TezMapredSplitsGrouper {
+  private static final Log LOG = LogFactory.getLog(TezMapredSplitsGrouper.class);
+
+  class SplitHolder {
+    InputSplit split;
+    boolean isProcessed = false;
+    SplitHolder(InputSplit split) {
+      this.split = split;
+    }
+  }
+  
+  class LocationHolder {
+    List<SplitHolder> splits;
+    int headIndex = 0;
+    LocationHolder(int capacity) {
+      splits = new ArrayList<SplitHolder>(capacity);
+    }
+    boolean isEmpty() {
+      return (headIndex == splits.size());
+    }
+    SplitHolder getUnprocessedHeadSplit() {
+      while (!isEmpty()) {
+        SplitHolder holder = splits.get(headIndex);
+        if (!holder.isProcessed) {
+          return holder;
+        }
+        incrementHeadIndex();
+      }
+      return null;
+    }
+    void incrementHeadIndex() {
+      headIndex++;
+    }
+  }
+  
+  public InputSplit[] getGroupedSplits(Configuration conf,
+      InputSplit[] originalSplits, int desiredNumSplits,
+      String wrappedInputFormatName) throws IOException {
+    LOG.info("Grouping splits in Tez");
+
+    int configNumSplits = conf.getInt(TezConfiguration.TEZ_AM_GROUPING_SPLIT_COUNT, 0);
+    if (configNumSplits > 0) {
+      // always use config override if specified
+      desiredNumSplits = configNumSplits;
+      LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
+    }
+    
+    if (! (configNumSplits > 0 || 
+          originalSplits == null || 
+          originalSplits.length == 0) ) {
+      // numSplits has not been overridden by config
+      // numSplits has been set at runtime
+      // there are splits generated
+      // Do sanity checks
+      long totalLength = 0;
+      for (InputSplit split : originalSplits) {
+        totalLength += split.getLength();
+      }
+
+      int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.length;
+      long lengthPerGroup = totalLength/splitCount;
+      
+      long maxLengthPerGroup = conf.getLong(
+          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MAX_SIZE,
+          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
+      long minLengthPerGroup = conf.getLong(
+          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MIN_SIZE,
+          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
+      if (maxLengthPerGroup < minLengthPerGroup || 
+          minLengthPerGroup <=0) {
+        throw new TezUncheckedException(
+          "Invalid max/min group lengths. Required min>0, max>=min. " +
+          " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
+      }
+      if (lengthPerGroup > maxLengthPerGroup) {
+        // splits too big to work. Need to override with max size.
+        int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
+        LOG.info("Desired splits: " + desiredNumSplits + " too small. " + 
+            " Desired splitLength: " + lengthPerGroup + 
+            " Max splitLength: " + maxLengthPerGroup +
+            " New desired splits: " + newDesiredNumSplits + 
+            " Total length: " + totalLength +
+            " Original splits: " + originalSplits.length);
+        
+        desiredNumSplits = newDesiredNumSplits;
+      } else if (lengthPerGroup < minLengthPerGroup) {
+        // splits too small to work. Need to override with size.
+        int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
+        LOG.info("Desired splits: " + desiredNumSplits + " too large. " + 
+            " Desired splitLength: " + lengthPerGroup + 
+            " Min splitLength: " + minLengthPerGroup +
+            " New desired splits: " + newDesiredNumSplits + 
+            " Total length: " + totalLength +
+            " Original splits: " + originalSplits.length);
+        
+        desiredNumSplits = newDesiredNumSplits;
+      }
+    }
+    
+    if (originalSplits == null) {
+      LOG.info("Null original splits");
+      return null;
+    }
+    
+    if (desiredNumSplits == 0 ||
+        originalSplits.length == 0 ||
+        desiredNumSplits >= originalSplits.length) {
+      // nothing set. so return all the splits as is
+      LOG.info("Using original number of splits: " + originalSplits.length +
+          " desired splits: " + desiredNumSplits);
+      InputSplit[] groupedSplits = new TezGroupedSplit[originalSplits.length];
+      int i=0;
+      for (InputSplit split : originalSplits) {
+        TezGroupedSplit newSplit = 
+            new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations());
+        newSplit.addSplit(split);
+        groupedSplits[i++] = newSplit;
+      }
+      return groupedSplits;
+    }
+    
+    String emptyLocation = "EmptyLocation";
+    String[] emptyLocations = {emptyLocation};
+    List<InputSplit> groupedSplitsList = new ArrayList<InputSplit>(desiredNumSplits);
+    
+    long totalLength = 0;
+    Map<String, LocationHolder> distinctLocations = new HashMap<String, LocationHolder>();
+    // go through splits and add them to locations
+    for (InputSplit split : originalSplits) {
+      totalLength += split.getLength();
+      String[] locations = split.getLocations();
+      if (locations == null || locations.length == 0) {
+        locations = emptyLocations;
+      }
+      for (String location : locations ) {
+        if (location == null) {
+          location = emptyLocation;
+        }
+        distinctLocations.put(location, null);
+      }
+    }
+    
+    long lengthPerGroup = totalLength/desiredNumSplits;
+    int numNodeLocations = distinctLocations.size();
+    int numSplitsPerLocation = originalSplits.length/numNodeLocations;
+    int numSplitsInGroup = originalSplits.length/desiredNumSplits;
+
+    // allocation loop here so that we have a good initial size for the lists
+    for (String location : distinctLocations.keySet()) {
+      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
+    }
+    
+    Set<String> locSet = new HashSet<String>();
+    for (InputSplit split : originalSplits) {
+      locSet.clear();
+      SplitHolder splitHolder = new SplitHolder(split);
+      String[] locations = split.getLocations();
+      if (locations == null || locations.length == 0) {
+        locations = emptyLocations;
+      }
+      for (String location : locations) {
+        if (location == null) {
+          location = emptyLocation;
+        }
+        locSet.add(location);
+      }
+      for (String location : locSet) {
+        LocationHolder holder = distinctLocations.get(location);
+        holder.splits.add(splitHolder);
+      }
+    }
+    
+    boolean groupByLength = conf.getBoolean(
+        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH,
+        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
+    boolean groupByCount = conf.getBoolean(
+        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT,
+        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT_DEFAULT);
+    if (!(groupByLength || groupByCount)) {
+      throw new TezUncheckedException(
+          "None of the grouping parameters are true: "
+              + TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH + ", "
+              + TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT);
+    }
+    LOG.info("Desired numSplits: " + desiredNumSplits +
+        " lengthPerGroup: " + lengthPerGroup +
+        " numLocations: " + numNodeLocations +
+        " numSplitsPerLocation: " + numSplitsPerLocation +
+        " numSplitsInGroup: " + numSplitsInGroup + 
+        " totalLength: " + totalLength +
+        " numOriginalSplits: " + originalSplits.length +
+        " . Grouping by length: " + groupByLength + " count: " + groupByCount);
+    
+    // go through locations and group splits
+    int splitsProcessed = 0;
+    List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup+1);
+    Set<String> groupLocationSet = new HashSet<String>(10);
+    boolean allowSmallGroups = false;
+    boolean doingRackLocal = false;
+    int iterations = 0;
+    while (splitsProcessed < originalSplits.length) {
+      iterations++;
+      int numFullGroupsCreated = 0;
+      for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
+        group.clear();
+        groupLocationSet.clear();
+        String location = entry.getKey();
+        LocationHolder holder = entry.getValue();
+        SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
+        if (splitHolder == null) {
+          // all splits on node processed
+          continue;
+        }
+        int oldHeadIndex = holder.headIndex;
+        long groupLength = 0;
+        int groupNumSplits = 0;
+        do {
+          group.add(splitHolder);
+          groupLength += splitHolder.split.getLength();
+          groupNumSplits++;
+          holder.incrementHeadIndex();
+          splitHolder = holder.getUnprocessedHeadSplit();
+        } while(splitHolder != null  
+            && (!groupByLength || 
+                (groupLength + splitHolder.split.getLength() <= lengthPerGroup))
+            && (!groupByCount || 
+                (groupNumSplits + 1 <= numSplitsInGroup)));
+
+        if (holder.isEmpty() 
+            && !allowSmallGroups
+            && (!groupByLength || groupLength < lengthPerGroup/2)
+            && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
+          // group too small, reset it
+          holder.headIndex = oldHeadIndex;
+          continue;
+        }
+        
+        numFullGroupsCreated++;
+
+        // One split group created
+        String[] groupLocation = {location};
+        if (location == emptyLocation) {
+          groupLocation = null;
+        } else if (doingRackLocal) {
+          for (SplitHolder splitH : group) {
+            String[] locations = splitH.split.getLocations();
+            if (locations != null) {
+              for (String loc : locations) {
+                if (loc != null) {
+                  groupLocationSet.add(loc);
+                }
+              }
+            }
+          }
+          groupLocation = groupLocationSet.toArray(groupLocation);
+        }
+        TezGroupedSplit groupedSplit = 
+            new TezGroupedSplit(group.size(), wrappedInputFormatName, 
+                groupLocation, 
+                // pass rack local hint directly to AM
+                ((doingRackLocal && location != emptyLocation)?location:null));
+        for (SplitHolder groupedSplitHolder : group) {
+          groupedSplit.addSplit(groupedSplitHolder.split);
+          Preconditions.checkState(groupedSplitHolder.isProcessed == false, 
+              "Duplicates in grouping at location: " + location);
+          groupedSplitHolder.isProcessed = true;
+          splitsProcessed++;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Grouped " + group.size()
+              + " length: " + groupedSplit.getLength()
+              + " split at: " + location);
+        }
+        groupedSplitsList.add(groupedSplit);
+      }
+      
+      if (!doingRackLocal && numFullGroupsCreated < 1) {
+        // no node could create a node-local group. go rack-local
+        doingRackLocal = true;
+        // re-create locations
+        int numRemainingSplits = originalSplits.length - splitsProcessed;
+        Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits);
+        // gather remaining splits.
+        for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
+          LocationHolder locHolder = entry.getValue();
+          while (!locHolder.isEmpty()) {
+            SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit();
+            if (splitHolder != null) {
+              remainingSplits.add(splitHolder.split);
+              locHolder.incrementHeadIndex();
+            }
+          }
+        }
+        if (remainingSplits.size() != numRemainingSplits) {
+          throw new TezUncheckedException("Expected: " + numRemainingSplits 
+              + " got: " + remainingSplits.size());
+        }
+        
+        // doing all this now instead of up front because the number of remaining
+        // splits is expected to be much smaller
+        RackResolver.init(conf);
+        Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
+        Map<String, LocationHolder> rackLocations = new HashMap<String, LocationHolder>();
+        for (String location : distinctLocations.keySet()) {
+          String rack = emptyLocation;
+          if (location != emptyLocation) {
+            rack = RackResolver.resolve(location).getNetworkLocation();
+          }
+          locToRackMap.put(location, rack);
+          if (rackLocations.get(rack) == null) {
+            // splits will probably be located in all racks
+            rackLocations.put(rack, new LocationHolder(numRemainingSplits));
+          }
+        }
+        HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
+        for (InputSplit split : remainingSplits) {
+          rackSet.clear();
+          SplitHolder splitHolder = new SplitHolder(split);
+          String[] locations = split.getLocations();
+          if (locations == null || locations.length == 0) {
+            locations = emptyLocations;
+          }
+          for (String location : locations ) {
+            if ( location == null) {
+              location = emptyLocation;
+            }
+            rackSet.add(locToRackMap.get(location));
+          }
+          for (String rack : rackSet) {
+            rackLocations.get(rack).splits.add(splitHolder);
+          }
+        }
+        
+        distinctLocations.clear();
+        distinctLocations = rackLocations;
+        // adjust split length to be smaller because the data is non local
+        float rackSplitReduction = conf.getFloat(
+            TezConfiguration.TEZ_AM_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
+            TezConfiguration.TEZ_AM_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
+        if (rackSplitReduction > 0) {
+          long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
+          int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);
+          if (newLengthPerGroup > 0) {
+            lengthPerGroup = newLengthPerGroup;
+          }
+          if (newNumSplitsInGroup > 0) {
+            numSplitsInGroup = newNumSplitsInGroup;
+          }
+        }
+        
+        LOG.info("Doing rack local after iteration: " + iterations +
+            " splitsProcessed: " + splitsProcessed + 
+            " numFullGroupsInRound: " + numFullGroupsCreated +
+            " totalGroups: " + groupedSplitsList.size() +
+            " lengthPerGroup: " + lengthPerGroup +
+            " numSplitsInGroup: " + numSplitsInGroup);
+        
+        // dont do smallGroups for the first pass
+        continue;
+      }
+      
+      if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
+        // a few nodes have a lot of data or data is thinly spread across nodes
+        // so allow small groups now        
+        allowSmallGroups = true;
+        LOG.info("Allowing small groups after iteration: " + iterations +
+            " splitsProcessed: " + splitsProcessed + 
+            " numFullGroupsInRound: " + numFullGroupsCreated +
+            " totalGroups: " + groupedSplitsList.size());
+      }
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Iteration: " + iterations +
+            " splitsProcessed: " + splitsProcessed + 
+            " numFullGroupsInRound: " + numFullGroupsCreated +
+            " totalGroups: " + groupedSplitsList.size());
+      }
+    }
+    InputSplit[] groupedSplits = new InputSplit[groupedSplitsList.size()];
+    groupedSplitsList.toArray(groupedSplits);
+    LOG.info("Number of splits desired: " + desiredNumSplits + 
+        " created: " + groupedSplitsList.size() + 
+        " splitsProcessed: " + splitsProcessed);
+    return groupedSplits;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25cb8930/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 2b17c11..ce1e5c5 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -49,7 +49,6 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
 
   InputFormat<K, V> wrappedInputFormat;
   int desiredNumSplits = 0;
-  List<InputSplit> groupedSplits = null;
   Configuration conf;
   
   public TezGroupedSplitsInputFormat() {
@@ -106,348 +105,10 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException,
       InterruptedException {
-    LOG.info("Grouping splits in Tez");
-
-    int configNumSplits = conf.getInt(TezConfiguration.TEZ_AM_GROUPING_SPLIT_COUNT, 0);
-    if (configNumSplits > 0) {
-      // always use config override if specified
-      desiredNumSplits = configNumSplits;
-      LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
-    }
-    
     List<InputSplit> originalSplits = wrappedInputFormat.getSplits(context);
-    
-    if (! (configNumSplits > 0 || 
-          originalSplits == null || 
-          originalSplits.size() == 0)) {
-      // numSplits has not been overridden by config
-      // numSplits has been set at runtime
-      // there are splits generated
-      // desired splits is less than number of splits generated
-      // Do sanity checks
-      long totalLength = 0;
-      for (InputSplit split : originalSplits) {
-        totalLength += split.getLength();
-      }
-  
-      int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();
-      long lengthPerGroup = totalLength/splitCount;
-
-      long maxLengthPerGroup = conf.getLong(
-          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MAX_SIZE,
-          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
-      long minLengthPerGroup = conf.getLong(
-          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MIN_SIZE,
-          TezConfiguration.TEZ_AM_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
-      if (maxLengthPerGroup < minLengthPerGroup || 
-          minLengthPerGroup <=0) {
-        throw new TezUncheckedException(
-          "Invalid max/min group lengths. Required min>0, max>=min. " +
-          " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
-      }
-      if (lengthPerGroup > maxLengthPerGroup) {
-        // splits too big to work. Need to override with max size.
-        int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
-        LOG.info("Desired splits: " + desiredNumSplits + " too small. " + 
-            " Desired splitLength: " + lengthPerGroup + 
-            " Max splitLength: " + maxLengthPerGroup +
-            " New desired splits: " + newDesiredNumSplits + 
-            " Total length: " + totalLength +
-            " Original splits: " + originalSplits.size());
-        
-        desiredNumSplits = newDesiredNumSplits;
-      } else if (lengthPerGroup < minLengthPerGroup) {
-        // splits too small to work. Need to override with size.
-        int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
-        LOG.info("Desired splits: " + desiredNumSplits + " too large. " + 
-            " Desired splitLength: " + lengthPerGroup + 
-            " Min splitLength: " + minLengthPerGroup +
-            " New desired splits: " + newDesiredNumSplits + 
-            " Total length: " + totalLength +
-            " Original splits: " + originalSplits.size());
-        
-        desiredNumSplits = newDesiredNumSplits;
-      }
-    }
-     
+    TezMapReduceSplitsGrouper grouper = new TezMapReduceSplitsGrouper();
     String wrappedInputFormatName = wrappedInputFormat.getClass().getName();
-    if (desiredNumSplits == 0 ||
-        originalSplits.size() == 0 ||
-        desiredNumSplits >= originalSplits.size()) {
-      // nothing set. so return all the splits as is
-      LOG.info("Using original number of splits: " + originalSplits.size() +
-          " desired splits: " + desiredNumSplits);
-      groupedSplits = new ArrayList<InputSplit>(originalSplits.size());
-      for (InputSplit split : originalSplits) {
-        TezGroupedSplit newSplit = 
-            new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations());
-        newSplit.addSplit(split);
-        groupedSplits.add(newSplit);
-      }
-      return groupedSplits;
-    }
-    
-    String emptyLocation = "EmptyLocation";
-    String[] emptyLocations = {emptyLocation};
-    groupedSplits = new ArrayList<InputSplit>(desiredNumSplits);
-    
-    long totalLength = 0;
-    Map<String, LocationHolder> distinctLocations = new HashMap<String, LocationHolder>();
-    // go through splits and add them to locations
-    for (InputSplit split : originalSplits) {
-      totalLength += split.getLength();
-      String[] locations = split.getLocations();
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations ) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        distinctLocations.put(location, null);
-      }
-    }
-    
-    long lengthPerGroup = totalLength/desiredNumSplits;
-    int numNodeLocations = distinctLocations.size();
-    int numSplitsPerLocation = originalSplits.size()/numNodeLocations;
-    int numSplitsInGroup = originalSplits.size()/desiredNumSplits;
-    
-    // allocation loop here so that we have a good initial size for the lists
-    for (String location : distinctLocations.keySet()) {
-      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
-    }
-    
-    Set<String> locSet = new HashSet<String>();
-    for (InputSplit split : originalSplits) {
-      locSet.clear();
-      SplitHolder splitHolder = new SplitHolder(split);
-      String[] locations = split.getLocations();
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        locSet.add(location);
-      }
-      for (String location : locSet) {
-        LocationHolder holder = distinctLocations.get(location);
-        holder.splits.add(splitHolder);
-      }
-    }
-    
-    boolean groupByLength = conf.getBoolean(
-        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH,
-        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
-    boolean groupByCount = conf.getBoolean(
-        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT,
-        TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT_DEFAULT);
-    if (!(groupByLength || groupByCount)) {
-      throw new TezUncheckedException(
-          "None of the grouping parameters are true: "
-              + TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_LENGTH + ", "
-              + TezConfiguration.TEZ_AM_GROUPING_SPLIT_BY_COUNT);
-    }
-    LOG.info("Desired numSplits: " + desiredNumSplits +
-        " lengthPerGroup: " + lengthPerGroup +
-        " numLocations: " + numNodeLocations +
-        " numSplitsPerLocation: " + numSplitsPerLocation +
-        " numSplitsInGroup: " + numSplitsInGroup + 
-        " totalLength: " + totalLength +
-        " numOriginalSplits: " + originalSplits.size() +
-        " . Grouping by length: " + groupByLength + " count: " + groupByCount);
-    
-    // go through locations and group splits
-    int splitsProcessed = 0;
-    List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup);
-    Set<String> groupLocationSet = new HashSet<String>(10);
-    boolean allowSmallGroups = false;
-    boolean doingRackLocal = false;
-    int iterations = 0;
-    while (splitsProcessed < originalSplits.size()) {
-      iterations++;
-      int numFullGroupsCreated = 0;
-      for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
-        group.clear();
-        groupLocationSet.clear();
-        String location = entry.getKey();
-        LocationHolder holder = entry.getValue();
-        SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
-        if (splitHolder == null) {
-          // all splits on node processed
-          continue;
-        }
-        int oldHeadIndex = holder.headIndex;
-        long groupLength = 0;
-        int groupNumSplits = 0;
-        do {
-          group.add(splitHolder);
-          groupLength += splitHolder.split.getLength();
-          groupNumSplits++;
-          holder.incrementHeadIndex();
-          splitHolder = holder.getUnprocessedHeadSplit();
-        } while(splitHolder != null  
-            && (!groupByLength || 
-                (groupLength + splitHolder.split.getLength() <= lengthPerGroup))
-            && (!groupByCount || 
-                (groupNumSplits + 1 <= numSplitsInGroup)));
-
-        if (holder.isEmpty() 
-            && !allowSmallGroups
-            && (!groupByLength || groupLength < lengthPerGroup/2)
-            && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
-          // group too small, reset it
-          holder.headIndex = oldHeadIndex;
-          continue;
-        }
-        
-        numFullGroupsCreated++;
-
-        // One split group created
-        String[] groupLocation = {location};
-        if (location == emptyLocation) {
-          groupLocation = null;
-        } else if (doingRackLocal) {
-          for (SplitHolder splitH : group) {
-            String[] locations = splitH.split.getLocations();
-            if (locations != null) {
-              for (String loc : locations) {
-                if (loc != null) {
-                  groupLocationSet.add(loc);
-                }
-              }
-            }
-          }
-          groupLocation = groupLocationSet.toArray(groupLocation);
-        }
-        TezGroupedSplit groupedSplit = 
-            new TezGroupedSplit(group.size(), wrappedInputFormatName, 
-                groupLocation,
-                // pass rack local hint directly to AM
-                ((doingRackLocal && location != emptyLocation)?location:null));
-        for (SplitHolder groupedSplitHolder : group) {
-          groupedSplit.addSplit(groupedSplitHolder.split);
-          Preconditions.checkState(groupedSplitHolder.isProcessed == false,
-              "Duplicates in grouping at location: " + location);
-          groupedSplitHolder.isProcessed = true;
-          splitsProcessed++;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Grouped " + group.size()
-              + " length: " + groupedSplit.getLength()
-              + " split at: " + location);
-        }
-        groupedSplits.add(groupedSplit);
-      }
-      
-      if (!doingRackLocal && numFullGroupsCreated < 1) {
-        // no node could create a node-local group. go rack-local
-        doingRackLocal = true;
-        // re-create locations
-        int numRemainingSplits = originalSplits.size() - splitsProcessed;
-        Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits);
-        // gather remaining splits.
-        for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
-          LocationHolder locHolder = entry.getValue();
-          while (!locHolder.isEmpty()) {
-            SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit();
-            if (splitHolder != null) {
-              remainingSplits.add(splitHolder.split);
-              locHolder.incrementHeadIndex();
-            }
-          }
-        }
-        if (remainingSplits.size() != numRemainingSplits) {
-          throw new TezUncheckedException("Expected: " + numRemainingSplits 
-              + " got: " + remainingSplits.size());
-        }
-        
-        // doing all this now instead of up front because the number of remaining
-        // splits is expected to be much smaller
-        RackResolver.init(conf);
-        Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
-        Map<String, LocationHolder> rackLocations = new HashMap<String, LocationHolder>();
-        for (String location : distinctLocations.keySet()) {
-          String rack = emptyLocation;
-          if (location != emptyLocation) {
-            rack = RackResolver.resolve(location).getNetworkLocation();
-          }
-          locToRackMap.put(location, rack);
-          if (rackLocations.get(rack) == null) {
-            // splits will probably be located in all racks
-            rackLocations.put(rack, new LocationHolder(numRemainingSplits));
-          }
-        }
-        HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
-        for (InputSplit split : remainingSplits) {
-          rackSet.clear();
-          SplitHolder splitHolder = new SplitHolder(split);
-          String[] locations = split.getLocations();
-          if (locations == null || locations.length == 0) {
-            locations = emptyLocations;
-          }
-          for (String location : locations ) {
-            if (location == null) {
-              location = emptyLocation;
-            }
-            rackSet.add(locToRackMap.get(location));
-          }
-          for (String rack : rackSet) {
-            rackLocations.get(rack).splits.add(splitHolder);
-          }
-        }
-        
-        distinctLocations.clear();
-        distinctLocations = rackLocations;
-        // adjust split length to be smaller because the data is non local
-        float rackSplitReduction = conf.getFloat(
-            TezConfiguration.TEZ_AM_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
-            TezConfiguration.TEZ_AM_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
-        if (rackSplitReduction > 0) {
-          long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
-          int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);
-          if (newLengthPerGroup > 0) {
-            lengthPerGroup = newLengthPerGroup;
-          }
-          if (newNumSplitsInGroup > 0) {
-            numSplitsInGroup = newNumSplitsInGroup;
-          }
-        }
-        
-        LOG.info("Doing rack local after iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplits.size() +
-            " lengthPerGroup: " + lengthPerGroup +
-            " numSplitsInGroup: " + numSplitsInGroup);
-        
-        // dont do smallGroups for the first pass
-        continue;
-      }
-      
-      if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
-        // a few nodes have a lot of data or data is thinly spread across nodes
-        // so allow small groups now        
-        allowSmallGroups = true;
-        LOG.info("Allowing small groups after iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplits.size());
-      }
-      
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplits.size());
-      }
-    }
-    LOG.info("Number of splits desired: " + desiredNumSplits + 
-        " created: " + groupedSplits.size() + 
-        " splitsProcessed: " + splitsProcessed);
-    return groupedSplits;
+    return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName);
   }
 
   @Override


Mime
View raw message