accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cjno...@apache.org
Subject git commit: Pulled William Slacum's multi-table input format into the current design with the InputConfigurator. ACCUMULO-391
Date Thu, 12 Sep 2013 01:13:29 GMT
Updated Branches:
  refs/heads/ACCUMULO-391 9e021f7eb -> e781879d2


Pulled William Slacum's multi-table input format into the current design with the InputConfigurator. ACCUMULO-391


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e781879d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e781879d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e781879d

Branch: refs/heads/ACCUMULO-391
Commit: e781879d217476793c18725476453a58e9258d02
Parents: 9e021f7
Author: Corey J. Nolet <cjnolet@gmail.com>
Authored: Wed Sep 11 21:11:46 2013 -0400
Committer: Corey J. Nolet <cjnolet@gmail.com>
Committed: Wed Sep 11 21:11:46 2013 -0400

----------------------------------------------------------------------
 .../core/client/mapred/InputFormatBase.java     | 173 +++++----
 .../core/client/mapreduce/InputFormatBase.java  | 334 +++++++++++-----
 .../mapreduce/lib/support/TableRange.java       |  62 +++
 .../mapreduce/lib/util/InputConfigurator.java   | 152 ++++++--
 .../client/mapreduce/multi/InputFormatBase.java | 378 +++++++++++--------
 5 files changed, 721 insertions(+), 378 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e781879d/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
index 9f16ab8..74373f6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -68,6 +69,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import static org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer.deserialize;
+
 /**
  * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
  * <p>
@@ -280,7 +283,11 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @see #setInputTableName(JobConf, String)
    */
   protected static String getInputTableName(JobConf job) {
-    return InputConfigurator.getInputTableName(CLASS, job);
+    String[] tableNames = InputConfigurator.getInputTableNames(CLASS, job);
+    if(tableNames.length > 0)
+      return tableNames[0];
+    else
+      return null;
   }
   
   /**
@@ -333,7 +340,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @since 1.5.0
    * @see #setRanges(JobConf, Collection)
    */
-  protected static List<Range> getRanges(JobConf job) throws IOException {
+  protected static Map<String, List<Range>> getRanges(JobConf job) throws IOException {
     return InputConfigurator.getRanges(CLASS, job);
   }
   
@@ -538,8 +545,8 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobConf job) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, job);
+  protected static TabletLocator getTabletLocator(JobConf job, String tableName) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, job, tableName);
   }
   
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -766,94 +773,100 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     log.setLevel(getLogLevel(job));
     validateOptions(job);
-    
-    String tableName = getInputTableName(job);
+
     boolean autoAdjust = getAutoAdjustRanges(job);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job);
-    
-    if (ranges.isEmpty()) {
-      ranges = new ArrayList<Range>(1);
-      ranges.add(new Range());
-    }
-    
-    // get the metadata information for these ranges
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl;
-    try {
-      if (isOfflineScan(job)) {
-        binnedRanges = binOfflineTable(job, tableName, ranges);
-        while (binnedRanges == null) {
-          // Some tablets were still online, try again
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+    Map<String,List<Range>> tablesRanges = getRanges(job);
+    LinkedList<org.apache.hadoop.mapreduce.InputSplit> splits = new LinkedList<org.apache.hadoop.mapreduce.InputSplit>();
+
+    for (Entry<String,List<Range>> tableRanges : tablesRanges.entrySet()) {
+      String tableName = tableRanges.getKey();
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableRanges.getValue()) : tableRanges.getValue();
+
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<Range>(1);
+        ranges.add(new Range());
+      }
+
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      TabletLocator tl;
+      try {
+        if (isOfflineScan(job)) {
           binnedRanges = binOfflineTable(job, tableName, ranges);
-        }
-      } else {
-        Instance instance = getInstance(job);
-        String tableId = null;
-        tl = getTabletLocator(job);
-        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
-        tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job))), ranges,
-            binnedRanges).isEmpty()) {
-          if (!(instance instanceof MockInstance)) {
-            if (tableId == null)
-              tableId = Tables.getTableId(instance, tableName);
-            if (!Tables.exists(instance, tableId))
-              throw new TableDeletedException(tableId);
-            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-              throw new TableOfflineException(instance, tableId);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            binnedRanges = binOfflineTable(job, tableName, ranges);
           }
-          binnedRanges.clear();
-          log.warn("Unable to locate bins for specified ranges. Retrying.");
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+        } else {
+          Instance instance = getInstance(job);
+          String tableId = null;
+          tl = getTabletLocator(job, tableName);
+          // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
           tl.invalidateCache();
+          Credentials creds = new Credentials(getPrincipal(job),deserialize (getTokenClass (job), getToken (job)));
+
+          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+            if (!(instance instanceof MockInstance)) {
+              if (tableId == null)
+                tableId = Tables.getTableId(instance, tableName);
+              if (!Tables.exists(instance, tableId))
+                throw new TableDeletedException(tableId);
+              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, tableId);
+            }
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            tl.invalidateCache();
+          }
         }
+      } catch (Exception e) {
+        throw new IOException(e);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    
-    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
-    HashMap<Range,ArrayList<String>> splitsToAdd = null;
-    
-    if (!autoAdjust)
-      splitsToAdd = new HashMap<Range,ArrayList<String>>();
-    
-    HashMap<String,String> hostNameCache = new HashMap<String,String>();
-    
-    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-      String ip = tserverBin.getKey().split(":", 2)[0];
-      String location = hostNameCache.get(ip);
-      if (location == null) {
-        InetAddress inetAddress = InetAddress.getByName(ip);
-        location = inetAddress.getHostName();
-        hostNameCache.put(ip, location);
-      }
-      
-      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-        Range ke = extentRanges.getKey().toDataRange();
-        for (Range r : extentRanges.getValue()) {
-          if (autoAdjust) {
-            // divide ranges into smaller ranges, based on the tablets
-            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
-          } else {
-            // don't divide ranges
-            ArrayList<String> locations = splitsToAdd.get(r);
-            if (locations == null)
-              locations = new ArrayList<String>(1);
-            locations.add(location);
-            splitsToAdd.put(r, locations);
+
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<Range,ArrayList<String>>();
+
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+
+      for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+
+        for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          for (Range r : extentRanges.getValue()) {
+            if (autoAdjust) {
+              // divide ranges into smaller ranges, based on the tablets
+              splits.add(new RangeInputSplit (tableName, ke.clip(r), new String[] {location}));
+            } else {
+              // don't divide ranges
+              ArrayList<String> locations = splitsToAdd.get(r);
+              if (locations == null)
+                locations = new ArrayList<String>(1);
+              locations.add(location);
+              splitsToAdd.put(r, locations);
+            }
           }
         }
       }
+
+      if (!autoAdjust)
+        for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
+          splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
     }
-    
-    if (!autoAdjust)
-      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
-        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
+
     return splits.toArray(new InputSplit[splits.size()]);
   }
-  
+
   /**
    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
    */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e781879d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index f70f6e2..7bd8da6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -50,7 +51,6 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -76,6 +76,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import static org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer.deserialize;
+
 /**
  * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
  * <p>
@@ -272,24 +274,44 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param tableName
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
   public static void setInputTableName(Job job, String tableName) {
     InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
   }
-  
+
   /**
-   * Gets the table name from the configuration.
-   * 
+   * Sets the name of the input table, over which this job will scan.
+   *
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableNames
+   *          the table to use when the tablename is null in the write call
+   * @since 1.6.0
+   */
+  public static void setInputTableNames(Job job, Collection<String> tableNames) {
+    InputConfigurator.setInputTableNames(CLASS, job.getConfiguration(), tableNames);
+  }
+
+
+  /**
+   * Gets the default table name from the configuration- the first table is used in case of multiple tables
+   * being set.
+   *
    * @param context
    *          the Hadoop context for the configured job
    * @return the table name
    * @since 1.5.0
    * @see #setInputTableName(Job, String)
    */
-  protected static String getInputTableName(JobContext context) {
-    return InputConfigurator.getInputTableName(CLASS, getConfiguration(context));
+  protected static String getDefaultInputTableName(JobContext context) {
+    String[] tableNames = InputConfigurator.getInputTableNames(CLASS, getConfiguration(context));
+    if(tableNames.length > 0)
+      return tableNames[0];
+    else
+      return null;
   }
-  
+
   /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
@@ -328,7 +350,21 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setRanges(Job job, Collection<Range> ranges) {
     InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
   }
-  
+
+  /**
+   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   *
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param ranges
+   *          the ranges that will be mapped over
+   * @since 1.5.0
+   */
+  public static void setRanges(Job job, Map<String, Collection<Range>> ranges) {
+    InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
+  }
+
+
   /**
    * Gets the ranges to scan over from a job.
    * 
@@ -340,7 +376,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @since 1.5.0
    * @see #setRanges(Job, Collection)
    */
-  protected static List<Range> getRanges(JobContext context) throws IOException {
+  protected static Map<String, List<Range>> getRanges(JobContext context) throws IOException {
     return InputConfigurator.getRanges(CLASS, getConfiguration(context));
   }
   
@@ -545,8 +581,8 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context));
+  protected static TabletLocator getTabletLocator(JobContext context, String table) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), table);
   }
   
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -611,14 +647,14 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       
       try {
         log.debug("Creating connector with user: " + principal);
-        Connector conn = instance.getConnector(principal, AuthenticationTokenSerializer.deserialize(tokenClass, token));
-        log.debug("Creating scanner for table: " + getInputTableName(attempt));
+        Connector conn = instance.getConnector(principal, deserialize (tokenClass, token));
+        log.debug("Creating scanner for table: " + getDefaultInputTableName (attempt));
         log.debug("Authorizations are: " + authorizations);
         if (isOfflineScan(attempt)) {
-          scanner = new OfflineScanner(instance, new Credentials(principal, AuthenticationTokenSerializer.deserialize(tokenClass, token)), Tables.getTableId(
-              instance, getInputTableName(attempt)), authorizations);
+          scanner = new OfflineScanner(instance, new Credentials(principal, deserialize (tokenClass, token)), Tables.getTableId(
+              instance, getDefaultInputTableName (attempt)), authorizations);
         } else {
-          scanner = conn.createScanner(getInputTableName(attempt), authorizations);
+          scanner = conn.createScanner(getDefaultInputTableName (attempt), authorizations);
         }
         if (isIsolated(attempt)) {
           log.info("Creating isolated scanner");
@@ -684,7 +720,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     
     Instance instance = getInstance(context);
-    Connector conn = instance.getConnector(getPrincipal(context), AuthenticationTokenSerializer.deserialize(getTokenClass(context), getToken(context)));
+    Connector conn = instance.getConnector(getPrincipal(context), deserialize (getTokenClass (context), getToken (context)));
     String tableId = Tables.getTableId(instance, tableName);
     
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
@@ -776,98 +812,194 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     return binnedRanges;
   }
   
-  /**
-   * Read the metadata table to get tablets and match up ranges to them.
-   */
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    log.setLevel(getLogLevel(context));
-    validateOptions(context);
-    
-    String tableName = getInputTableName(context);
-    boolean autoAdjust = getAutoAdjustRanges(context);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
-    
-    if (ranges.isEmpty()) {
-      ranges = new ArrayList<Range>(1);
-      ranges.add(new Range());
-    }
-    
-    // get the metadata information for these ranges
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl;
-    try {
-      if (isOfflineScan(context)) {
-        binnedRanges = binOfflineTable(context, tableName, ranges);
-        while (binnedRanges == null) {
-          // Some tablets were still online, try again
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(context, tableName, ranges);
-        }
-      } else {
-        Instance instance = getInstance(context);
-        String tableId = null;
-        tl = getTabletLocator(context);
-        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
-        tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(context), AuthenticationTokenSerializer.deserialize(getTokenClass(context), getToken(context))),
-            ranges, binnedRanges).isEmpty()) {
-          if (!(instance instanceof MockInstance)) {
-            if (tableId == null)
-              tableId = Tables.getTableId(instance, tableName);
-            if (!Tables.exists(instance, tableId))
-              throw new TableDeletedException(tableId);
-            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-              throw new TableOfflineException(instance, tableId);
+//  /**
+//   * Read the metadata table to get tablets and match up ranges to them.
+//   */
+//  @Override
+//  public List<InputSplit> getSplits(JobContext context) throws IOException {
+//    log.setLevel(getLogLevel(context));
+//    validateOptions(context);
+//
+//    boolean autoAdjust = getAutoAdjustRanges(context);
+//    String tableName = getDefaultInputTableName(context);
+//    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
+//
+//    if (ranges.isEmpty()) {
+//      ranges = new ArrayList<Range>(1);
+//      ranges.add(new Range());
+//    }
+//
+//    // get the metadata information for these ranges
+//    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+//    TabletLocator tl;
+//    try {
+//      if (isOfflineScan(context)) {
+//        binnedRanges = binOfflineTable(context, tableName, ranges);
+//        while (binnedRanges == null) {
+//          // Some tablets were still online, try again
+//          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+//          binnedRanges = binOfflineTable(context, tableName, ranges);
+//        }
+//      } else {
+//        Instance instance = getInstance(context);
+//        String tableId = null;
+//        tl = getTabletLocator(context);
+//        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
+//        tl.invalidateCache();
+//        while (!tl.binRanges(new Credentials(getPrincipal(context), AuthenticationTokenSerializer.deserialize(getTokenClass(context), getToken(context))),
+//            ranges, binnedRanges).isEmpty()) {
+//          if (!(instance instanceof MockInstance)) {
+//            if (tableId == null)
+//              tableId = Tables.getTableId(instance, tableName);
+//            if (!Tables.exists(instance, tableId))
+//              throw new TableDeletedException(tableId);
+//            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+//              throw new TableOfflineException(instance, tableId);
+//          }
+//          binnedRanges.clear();
+//          log.warn("Unable to locate bins for specified ranges. Retrying.");
+//          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+//          tl.invalidateCache();
+//        }
+//      }
+//    } catch (Exception e) {
+//      throw new IOException(e);
+//    }
+//
+//    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
+//    HashMap<Range,ArrayList<String>> splitsToAdd = null;
+//
+//    if (!autoAdjust)
+//      splitsToAdd = new HashMap<Range,ArrayList<String>>();
+//
+//    HashMap<String,String> hostNameCache = new HashMap<String,String>();
+//
+//    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+//      String ip = tserverBin.getKey().split(":", 2)[0];
+//      String location = hostNameCache.get(ip);
+//      if (location == null) {
+//        InetAddress inetAddress = InetAddress.getByName(ip);
+//        location = inetAddress.getHostName();
+//        hostNameCache.put(ip, location);
+//      }
+//
+//      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+//        Range ke = extentRanges.getKey().toDataRange();
+//        for (Range r : extentRanges.getValue()) {
+//          if (autoAdjust) {
+//            // divide ranges into smaller ranges, based on the tablets
+//            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
+//          } else {
+//            // don't divide ranges
+//            ArrayList<String> locations = splitsToAdd.get(r);
+//            if (locations == null)
+//              locations = new ArrayList<String>(1);
+//            locations.add(location);
+//            splitsToAdd.put(r, locations);
+//          }
+//        }
+//      }
+//    }
+//
+//    if (!autoAdjust)
+//      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
+//        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
+//    return splits;
+//  }
+
+  public List<InputSplit> getSplits(JobContext conf) throws IOException {
+    log.setLevel(getLogLevel(conf));
+    validateOptions(conf);
+
+    boolean autoAdjust = getAutoAdjustRanges(conf);
+    Map<String,List<Range>> tablesRanges = getRanges(conf);
+    LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
+
+    for (Entry<String,List<Range>> tableRanges : tablesRanges.entrySet()) {
+      String tableName = tableRanges.getKey();
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableRanges.getValue()) : tableRanges.getValue();
+
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<Range>(1);
+        ranges.add(new Range());
+      }
+
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      TabletLocator tl;
+      try {
+        if (isOfflineScan(conf)) {
+          binnedRanges = binOfflineTable(conf, tableName, ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            binnedRanges = binOfflineTable(conf, tableName, ranges);
           }
-          binnedRanges.clear();
-          log.warn("Unable to locate bins for specified ranges. Retrying.");
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+        } else {
+          Instance instance = getInstance(conf);
+          String tableId = null;
+          tl = getTabletLocator(conf, tableName);
+          // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
           tl.invalidateCache();
+          Credentials creds = new Credentials(getPrincipal(conf),deserialize (getTokenClass (conf), getToken (conf)));
+
+          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+            if (!(instance instanceof MockInstance)) {
+              if (tableId == null)
+                tableId = Tables.getTableId(instance, tableName);
+              if (!Tables.exists(instance, tableId))
+                throw new TableDeletedException(tableId);
+              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, tableId);
+            }
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            tl.invalidateCache();
+          }
         }
+      } catch (Exception e) {
+        throw new IOException(e);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    
-    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
-    HashMap<Range,ArrayList<String>> splitsToAdd = null;
-    
-    if (!autoAdjust)
-      splitsToAdd = new HashMap<Range,ArrayList<String>>();
-    
-    HashMap<String,String> hostNameCache = new HashMap<String,String>();
-    
-    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-      String ip = tserverBin.getKey().split(":", 2)[0];
-      String location = hostNameCache.get(ip);
-      if (location == null) {
-        InetAddress inetAddress = InetAddress.getByName(ip);
-        location = inetAddress.getHostName();
-        hostNameCache.put(ip, location);
-      }
-      
-      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-        Range ke = extentRanges.getKey().toDataRange();
-        for (Range r : extentRanges.getValue()) {
-          if (autoAdjust) {
-            // divide ranges into smaller ranges, based on the tablets
-            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
-          } else {
-            // don't divide ranges
-            ArrayList<String> locations = splitsToAdd.get(r);
-            if (locations == null)
-              locations = new ArrayList<String>(1);
-            locations.add(location);
-            splitsToAdd.put(r, locations);
+
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<Range,ArrayList<String>>();
+
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+
+      for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+
+        for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          for (Range r : extentRanges.getValue()) {
+            if (autoAdjust) {
+              // divide ranges into smaller ranges, based on the tablets
+              splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
+            } else {
+              // don't divide ranges
+              ArrayList<String> locations = splitsToAdd.get(r);
+              if (locations == null)
+                locations = new ArrayList<String>(1);
+              locations.add(location);
+              splitsToAdd.put(r, locations);
+            }
           }
         }
       }
+
+      if (!autoAdjust)
+        for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
+          splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
     }
-    
-    if (!autoAdjust)
-      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
-        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
     return splits;
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e781879d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/support/TableRange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/support/TableRange.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/support/TableRange.java
new file mode 100644
index 0000000..db0e327
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/support/TableRange.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.lib.support;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Pairs together a table name and a range.
+ */
+public class TableRange implements Writable {
+  private String tableName;
+  private Range range;
+
+  public TableRange() {
+    range = new Range();
+  }
+
+  public TableRange(String tableName, Range range) {
+    this.tableName = tableName;
+    this.range = range;
+  }
+
+  public String tableName() {
+    return tableName;
+  }
+
+  public Range range() {
+    return range;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tableName = in.readUTF();
+    range.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tableName);
+    range.write(out);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e781879d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index 9d7c878..dc77bf8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -22,11 +22,14 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.TreeMap;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -40,6 +43,7 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.mapreduce.lib.support.TableRange;
 import org.apache.accumulo.core.client.mock.MockTabletLocator;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
 import org.apache.accumulo.core.data.Range;
@@ -54,6 +58,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 
+import static org.apache.accumulo.core.util.ArgumentChecker.notNull;
+import static org.apache.commons.codec.binary.Base64.encodeBase64;
+import static org.apache.hadoop.util.StringUtils.join;
+import static org.apache.hadoop.util.StringUtils.split;
+
 /**
  * @since 1.5.0
  */
@@ -87,12 +96,30 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param tableName
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
   public static void setInputTableName(Class<?> implementingClass, Configuration conf, String tableName) {
-    ArgumentChecker.notNull(tableName);
+    notNull (tableName);
     conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName);
   }
-  
+
+  /**
+   * Sets the name of the input table, over which this job will scan.
+   *
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param tableNames
+   *          the tables to use when the tablename is null in the write call
+   * @since 1.6.0
+   */
+  public static void setInputTableNames(Class<?> implementingClass, Configuration conf, Collection<String> tableNames) {
+    notNull (tableNames);
+    conf.set (enumToConfKey (implementingClass, ScanOpts.TABLE_NAME), join (",", tableNames));    //TODO: Should this change to be TABLE_NAMES?
+  }
+
+
   /**
    * Gets the table name from the configuration.
    * 
@@ -104,8 +131,8 @@ public class InputConfigurator extends ConfiguratorBase {
    * @since 1.5.0
    * @see #setInputTableName(Class, Configuration, String)
    */
-  public static String getInputTableName(Class<?> implementingClass, Configuration conf) {
-    return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
+  public static String[] getInputTableNames(Class<?> implementingClass, Configuration conf) {
+    return split (conf.get (enumToConfKey (implementingClass, ScanOpts.TABLE_NAME), ","));
   }
   
   /**
@@ -150,22 +177,67 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param ranges
    *          the ranges that will be mapped over
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
   public static void setRanges(Class<?> implementingClass, Configuration conf, Collection<Range> ranges) {
-    ArgumentChecker.notNull(ranges);
+    notNull (ranges);
+    notNull(ranges);
+
+    String[] tableNames = getInputTableNames (implementingClass, conf);
+    if(tableNames.length > 0) {
+
+      String tableName = tableNames[0];
+
+      ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
+      try {
+        for (Range r : ranges) {
+          TableRange tblRange = new TableRange(tableName, r);
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          tblRange.write(new DataOutputStream(baos));
+          rangeStrings.add(new String(encodeBase64 (baos.toByteArray ())));
+        }
+      } catch (IOException ex) {
+        throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
+      }
+      conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
+    } else {
+      throw new IllegalStateException ("Ranges can't be added until a table is set.");
+    }
+
+  }
+
+  /**
+   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   *
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param ranges
+   *          the ranges that will be mapped over. key=tablename and value=ranges
+   * @since 1.6.0
+   */
+  public static void setRanges(Class<?> implementingClass, Configuration conf, Map<String, Collection<Range>> ranges) {
+    notNull(ranges);
+
     ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
-    try {
-      for (Range r : ranges) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        r.write(new DataOutputStream(baos));
-        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()), Constants.UTF8));
+    for (Map.Entry<String,Collection<Range>> pair : ranges.entrySet()) {
+      try {
+        String tableName = pair.getKey();
+        for (Range r : pair.getValue()) {
+          TableRange tblRange = new TableRange(tableName, r);
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          tblRange.write(new DataOutputStream(baos));
+          rangeStrings.add(new String(encodeBase64 (baos.toByteArray ())));
+        }
+      } catch (IOException ex) {
+        throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
       }
-    } catch (IOException ex) {
-      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
     }
     conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
   }
-  
+
+
   /**
    * Gets the ranges to scan over from a job.
    * 
@@ -179,14 +251,22 @@ public class InputConfigurator extends ConfiguratorBase {
    * @since 1.5.0
    * @see #setRanges(Class, Configuration, Collection)
    */
-  public static List<Range> getRanges(Class<?> implementingClass, Configuration conf) throws IOException {
-    ArrayList<Range> ranges = new ArrayList<Range>();
+  public static Map<String, List<Range>> getRanges(Class<?> implementingClass, Configuration conf) throws IOException {
+    TreeMap<String,List<Range>> ranges = new TreeMap<String,List<Range>>();
+
+    // create collections for each table
+    for (String table : getInputTableNames (implementingClass, conf)) {
+      ranges.put(table, new ArrayList<Range>());
+    }
+
+    // parse out the ranges and add them to table's bucket
     for (String rangeString : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES))) {
-      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(Constants.UTF8)));
-      Range range = new Range();
+      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
+      TableRange range = new TableRange ();
       range.readFields(new DataInputStream(bais));
-      ranges.add(range);
+      ranges.get(range.tableName()).add(range.range());
     }
+
     return ranges;
   }
   
@@ -203,15 +283,15 @@ public class InputConfigurator extends ConfiguratorBase {
    * @since 1.5.0
    */
   public static void fetchColumns(Class<?> implementingClass, Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
+    notNull (columnFamilyColumnQualifierPairs);
     ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
     for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
       if (column.getFirst() == null)
         throw new IllegalArgumentException("Column family can not be null");
       
-      String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), Constants.UTF8);
+      String col = new String(encodeBase64 (TextUtil.getBytes (column.getFirst ())), Constants.UTF8);
       if (column.getSecond() != null)
-        col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), Constants.UTF8);
+        col += ":" + new String(encodeBase64 (TextUtil.getBytes (column.getSecond ())), Constants.UTF8);
       columnStrings.add(col);
     }
     conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings.toArray(new String[0]));
@@ -255,7 +335,7 @@ public class InputConfigurator extends ConfiguratorBase {
     String newIter;
     try {
       cfg.write(new DataOutputStream(baos));
-      newIter = new String(Base64.encodeBase64(baos.toByteArray()), Constants.UTF8);
+      newIter = new String(encodeBase64 (baos.toByteArray ()), Constants.UTF8);
       baos.close();
     } catch (IOException e) {
       throw new IllegalArgumentException("unable to serialize IteratorSetting");
@@ -473,13 +553,13 @@ public class InputConfigurator extends ConfiguratorBase {
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf) throws TableNotFoundException {
+  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf, String tableName)
+          throws TableNotFoundException {
     String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
     if ("MockInstance".equals(instanceType))
       return new MockTabletLocator();
     Instance instance = getInstance(implementingClass, conf);
-    String tableName = getInputTableName(implementingClass, conf);
-    return TabletLocator.getLocator(instance, new Text(Tables.getTableId(instance, tableName)));
+    return TabletLocator.getLocator (instance, new Text (Tables.getTableId (instance, tableName)));
   }
   
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -503,18 +583,24 @@ public class InputConfigurator extends ConfiguratorBase {
     // validate that we can connect as configured
     try {
       Connector c = getInstance(implementingClass, conf).getConnector(getPrincipal(implementingClass, conf),
-          AuthenticationTokenSerializer.deserialize(getTokenClass(implementingClass, conf), getToken(implementingClass, conf)));
+          AuthenticationTokenSerializer.deserialize (getTokenClass (implementingClass, conf), getToken (implementingClass, conf)));
       if (!c.securityOperations().authenticateUser(getPrincipal(implementingClass, conf),
-          AuthenticationTokenSerializer.deserialize(getTokenClass(implementingClass, conf), getToken(implementingClass, conf))))
+          AuthenticationTokenSerializer.deserialize (getTokenClass (implementingClass, conf), getToken (implementingClass, conf))))
         throw new IOException("Unable to authenticate user");
-      if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), getInputTableName(implementingClass, conf), TablePermission.READ))
-        throw new IOException("Unable to access table");
-      
+
+      for(String tableName : getInputTableNames(implementingClass, conf)) {
+        if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), tableName, TablePermission.READ))
+          throw new IOException("Unable to access table");
+      }
+
       if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) {
         // validate that any scan-time iterators can be loaded by the the tablet servers
-        for (IteratorSetting iter : getIterators(implementingClass, conf)) {
-          if (!c.tableOperations().testClassLoad(getInputTableName(implementingClass, conf), iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
-            throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+        for (IteratorSetting iter : getIterators(implementingClass, conf)) {    // TODO: These iterators need to be separated by table
+          for(String tableName : getInputTableNames (implementingClass, conf)) {
+            if (!c.tableOperations().testClassLoad(tableName, iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
+              throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+
+          }
         }
       }
       

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e781879d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/multi/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/multi/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/multi/InputFormatBase.java
index dcbbed9..2aec520 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/multi/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/multi/InputFormatBase.java
@@ -150,10 +150,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     ArgumentChecker.notEmpty (tables);
     String tablesCsv = StringUtils.join(tables.iterator(), ',');
     conf.set(TABLE_NAMES, tablesCsv);
-    
+
     if (auths != null && !auths.isEmpty())
       conf.set(AUTHORIZATIONS, auths.serialize());
-    
+
     try {
       FileSystem fs = FileSystem.get(conf);
       Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".pw");
@@ -161,22 +161,45 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       FSDataOutputStream fos = fs.create(file, false);
       fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
       fs.deleteOnExit(file);
-      
+
       byte[] encodedPw = Base64.encodeBase64(passwd);
       fos.writeInt(encodedPw.length);
       fos.write(encodedPw);
       fos.close();
-      
+
       DistributedCache.addCacheFile(file.toUri(), conf);
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
     }
 
   }
-  
+
+  /**
+   * Initialize the user, table, and authorization information for the configuration object that will be used with an Accumulo InputFormat.
+   * This method has been deprecated in favor of using the multi-table version.
+   *
+   * @deprecated since 1.6.0
+   *
+   * @param conf
+   *          the Hadoop configuration object
+   * @param user
+   *          a valid accumulo user
+   * @param passwd
+   *          the user's password
+   * @param tableName
+   *          the table to read
+   * @param auths
+   *          the authorizations used to restrict data read
+   */
+  @Deprecated()
+  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String tableName, Authorizations auths) {
+    setInputInfo (conf, user, passwd, Collections.singleton (tableName), auths);
+  }
+
+
   /**
    * Configure a {@link ZooKeeperInstance} for this configuration object.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @param instanceName
@@ -188,15 +211,15 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
       throw new IllegalStateException("Instance info can only be set once per job");
     conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    
+
     ArgumentChecker.notNull(instanceName, zooKeepers);
     conf.set(INSTANCE_NAME, instanceName);
     conf.set(ZOOKEEPERS, zooKeepers);
   }
-  
+
   /**
    * Configure a {@link MockInstance} for this configuration object.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @param instanceName
@@ -207,10 +230,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     conf.setBoolean(MOCK, true);
     conf.set(INSTANCE_NAME, instanceName);
   }
-  
+
   /**
    * Set the ranges to map over for this configuration object.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @param ranges
@@ -218,7 +241,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setRanges(Configuration conf, Map<String, Collection<Range>> ranges) {
     ArgumentChecker.notNull(ranges);
-  
+
     ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
     for(Entry<String, Collection<Range>> pair : ranges.entrySet()) {
       try {
@@ -235,7 +258,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     }
     conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
   }
-  
+
   /**
    * Disables the adjustment of ranges for this configuration object. By default, overlapping ranges will be merged and ranges will be fit to existing tablet
    * boundaries. Disabling this adjustment will cause there to be exactly one mapper per range set using {@link #setRanges(Configuration, Map)}.
@@ -246,11 +269,11 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void disableAutoAdjustRanges(Configuration conf) {
     conf.setBoolean(AUTO_ADJUST_RANGES, false);
   }
-  
+
   /**
    * Sets the max # of values that may be returned for an individual Accumulo cell. By default, applied before all other Accumulo iterators (highest priority)
    * leveraged in the scan by the record reader. To adjust priority use setIterator() & setIteratorOptions() w/ the VersioningIterator type explicitly.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @param maxVersions
@@ -263,41 +286,41 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       throw new IOException("Invalid maxVersions: " + maxVersions + ".  Must be >= 1");
     conf.setInt(MAX_VERSIONS, maxVersions);
   }
-  
+
   /**
    * <p>
    * Enable reading offline tables. This will make the map reduce job directly read the tables files. If the table is not offline, then the job will fail. If
    * the table comes online during the map reduce job, its likely that the job will fail.
-   * 
+   *
    * <p>
    * To use this option, the map reduce user will need access to read the accumulo directory in HDFS.
-   * 
+   *
    * <p>
    * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
    * on the mappers classpath. The accumulo-site.xml may need to be on the mappers classpath if HDFS or the accumlo directory in HDFS are non-standard.
-   * 
+   *
    * <p>
    * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
    * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
    * reason to do this is that compaction will reduce each tablet in the table to one file, and its faster to read from one file.
-   * 
+   *
    * <p>
    * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
    * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
-   * 
+   *
    * @param conf
    *          the job
    * @param scanOff
    *          pass true to read offline tables
    */
-  
+
   public static void setScanOffline(Configuration conf, boolean scanOff) {
     conf.setBoolean(READ_OFFLINE, scanOff);
   }
-  
+
   /**
    * Restricts the columns that will be mapped over for this configuration object.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @param columnFamilyColumnQualifierPairs
@@ -310,7 +333,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
       if (column.getFirst() == null)
         throw new IllegalArgumentException("Column family can not be null");
-      
+
       String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())));
       if (column.getSecond() != null)
         col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())));
@@ -318,10 +341,37 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     }
     conf.setStrings(COLUMNS, columnStrings.toArray(new String[0]));
   }
-  
+
+  /**
+   * Restricts the columns that will be mapped over for this configuration object.
+   *
+   * @param conf
+   *          the Hadoop configuration object
+   * @param table
+   *          the table which the fetch of columns should be applied
+   * @param columnFamilyColumnQualifierPairs
+   *          A pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
+   *          selected. An empty set is the default and is equivalent to scanning the all columns.
+   */
+  public static void fetchColumns(Configuration conf, String table, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
+    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
+    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
+    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+      if (column.getFirst() == null)
+        throw new IllegalArgumentException("Column family can not be null");
+
+      String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())));
+      if (column.getSecond() != null)
+        col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())));
+      columnStrings.add(col);
+    }
+    conf.setStrings(COLUMNS + "\u0000" + table, columnStrings.toArray(new String[0]));
+  }
+
+
   /**
    * Sets the log level for this configuration object.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @param level
@@ -332,10 +382,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     log.setLevel(level);
     conf.setInt(LOGLEVEL, level.toInt());
   }
-  
+
   /**
    * Encode an iterator on the input for this configuration object.
-   * 
+   *
    * @param conf
    *          The Hadoop configuration in which to save the iterator configuration
    * @param tableIterators
@@ -349,7 +399,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     if(iterators != null) {
       throw new IllegalArgumentException("Iterators can only be set once per configuration.");
     }
-    
+
     for(Entry<String, ? extends Collection<IteratorSetting>> tableItrs : tableIterators.entrySet()) {
       String table = tableItrs.getKey();
       for (IteratorSetting cfg : tableItrs.getValue()) {
@@ -365,9 +415,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         for (Entry<String,String> entry : cfg.getOptions().entrySet()) {
           if (entry.getValue() == null)
             continue;
-          
+
           String iteratorOptions = conf.get(ITERATORS_OPTIONS);
-          
+
           // No options specified yet, create a new string
           if (iteratorOptions == null || iteratorOptions.isEmpty()) {
             iteratorOptions = new AccumuloIteratorOption(table, cfg.getName(), entry.getKey(), entry.getValue()).toString();
@@ -375,17 +425,17 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
             // append the next option & reset
             iteratorOptions = iteratorOptions.concat(ITERATORS_DELIM + new AccumuloIteratorOption(table, cfg.getName(), entry.getKey(), entry.getValue()));
           }
-          
+
           // Store the options w/ the job
           conf.set(ITERATORS_OPTIONS, iteratorOptions);
         }
       }
     }
   }
-  
+
   /**
    * Determines whether a configuration has isolation enabled.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return true if isolation is enabled, false otherwise
@@ -394,10 +444,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static boolean isIsolated(Configuration conf) {
     return conf.getBoolean(ISOLATED, false);
   }
-  
+
   /**
    * Determines whether a configuration uses local iterators.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return true if uses local iterators, false otherwise
@@ -406,10 +456,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static boolean usesLocalIterators(Configuration conf) {
     return conf.getBoolean(LOCAL_ITERATORS, false);
   }
-  
+
   /**
    * Gets the user name from the configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return the user name
@@ -418,10 +468,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static String getUsername(Configuration conf) {
     return conf.get(USERNAME);
   }
-  
+
   /**
    * Gets the password from the configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return the BASE64-encoded password
@@ -431,19 +481,19 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static byte[] getPassword(Configuration conf) throws IOException {
     FileSystem fs = FileSystem.get(conf);
     Path file = new Path(conf.get(PASSWORD_PATH));
-    
+
     FSDataInputStream fdis = fs.open(file);
     int length = fdis.readInt();
     byte[] encodedPassword = new byte[length];
     fdis.read(encodedPassword);
     fdis.close();
-    
+
     return Base64.decodeBase64(encodedPassword);
   }
-  
+
   /**
    * Gets the table name from the configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return the table name
@@ -452,10 +502,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static String[] getTablenames(Configuration conf) {
     return StringUtils.split(conf.get(TABLE_NAMES), ',');
   }
-  
+
   /**
    * Gets the authorizations to set for the scans from the configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return the accumulo scan authorizations
@@ -465,10 +515,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     String authString = conf.get(AUTHORIZATIONS);
     return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.split(","));
   }
-  
+
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return an accumulo instance
@@ -480,10 +530,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       return new MockInstance(conf.get(INSTANCE_NAME));
     return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
   }
-  
+
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return an accumulo tablet locator
@@ -505,10 +555,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
 //    return TabletLocator.getInstance(instance, new AuthInfo (username, ByteBuffer.wrap(password), instance.getInstanceID()),
 //        new Text(Tables.getTableId(instance, tableName)));
   }
-  
+
   /**
    * Gets the ranges to scan over from a configuration object.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return the ranges
@@ -518,12 +568,12 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   protected static TreeMap<String, List<Range>> getRanges(Configuration conf) throws IOException {
     TreeMap<String, List<Range>> ranges = new TreeMap<String, List<Range>>();
-    
+
     // create collections for each table
     for(String table : getTablenames(conf)) {
       ranges.put(table, new ArrayList<Range>());
     }
-    
+
     // parse out the ranges and add them to table's bucket
     for (String rangeString : conf.getStringCollection(RANGES)) {
       ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
@@ -531,14 +581,14 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       range.readFields(new DataInputStream(bais));
       ranges.get(range.tableName()).add(range.range());
     }
-    
+
     return ranges;
-    
+
   }
-  
+
   /**
    * Gets the columns to be mapped over from this configuration object.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return a set of columns
@@ -554,10 +604,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     }
     return columns;
   }
-  
+
   /**
    * Determines whether a configuration has auto-adjust ranges enabled.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return true if auto-adjust is enabled, false otherwise
@@ -566,10 +616,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static boolean getAutoAdjustRanges(Configuration conf) {
     return conf.getBoolean(AUTO_ADJUST_RANGES, true);
   }
-  
+
   /**
    * Gets the log level from this configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return the log level
@@ -578,11 +628,11 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static Level getLogLevel(Configuration conf) {
     return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
   }
-  
+
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @throws IOException
@@ -598,12 +648,12 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
       if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf)))
         throw new IOException("Unable to authenticate user");
-      
+
       for(String tableName : getTablenames(conf)) {
         if (!c.securityOperations().hasTablePermission(getUsername(conf), tableName, TablePermission.READ))
           throw new IOException("Unable to access table");
       }
-      
+
       if (!usesLocalIterators(conf)) {
         // validate that any scan-time iterators can be loaded by the the tablet servers
         for (AccumuloIterator iter : getIterators(conf)) {
@@ -611,17 +661,17 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
             throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
         }
       }
-      
+
     } catch (AccumuloException e) {
       throw new IOException(e);
     } catch (AccumuloSecurityException e) {
       throw new IOException(e);
     }
   }
-  
+
   /**
    * Gets the maxVersions to use for the {@link VersioningIterator} from this configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return the max versions, -1 if not configured
@@ -630,29 +680,29 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static int getMaxVersions(Configuration conf) {
     return conf.getInt(MAX_VERSIONS, -1);
   }
-  
+
   protected static boolean isOfflineScan(Configuration conf) {
     return conf.getBoolean(READ_OFFLINE, false);
   }
-  
+
   // Return a list of the iterator settings (for iterators to apply to a scanner)
-  
+
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return a list of iterators
    * @see #setIterators(Configuration, IteratorSetting)
    */
   protected static List<AccumuloIterator> getIterators(Configuration conf) {
-    
+
     String iterators = conf.get(ITERATORS);
-    
+
     // If no iterators are present, return an empty list
     if (iterators == null || iterators.isEmpty())
       return new ArrayList<AccumuloIterator>();
-    
+
     // Compose the set of iterators encoded in the job configuration
     StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS), ITERATORS_DELIM);
     List<AccumuloIterator> list = new ArrayList<AccumuloIterator>();
@@ -662,10 +712,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     }
     return list;
   }
-  
+
   /**
    * Gets a list of the iterator options specified on this configuration.
-   * 
+   *
    * @param conf
    *          the Hadoop configuration object
    * @return a list of iterator options
@@ -673,11 +723,11 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
     String iteratorOptions = conf.get(ITERATORS_OPTIONS);
-    
+
     // If no options are present, return an empty list
     if (iteratorOptions == null || iteratorOptions.isEmpty())
       return new ArrayList<AccumuloIteratorOption>();
-    
+
     // Compose the set of options encoded in the job configuration
     StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS_OPTIONS), ITERATORS_DELIM);
     List<AccumuloIteratorOption> list = new ArrayList<AccumuloIteratorOption>();
@@ -687,15 +737,15 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     }
     return list;
   }
-  
+
   protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
-    
+
     /**
      * Apply the configured iterators from the configuration to the scanner.
-     * 
+     *
      * @param conf
      *          the Hadoop configuration object
      * @param scanner
@@ -705,7 +755,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     protected void setupIterators(Configuration conf, Scanner scanner, String table) throws AccumuloException {
       List<AccumuloIterator> iterators = getIterators(conf);
       List<AccumuloIteratorOption> options = getIteratorOptions(conf);
-      
+
       Map<String,IteratorSetting> scanIterators = new HashMap<String,IteratorSetting>();
       for (AccumuloIterator iterator : iterators) {
         if(iterator.getTable().equals(table)) {
@@ -721,10 +771,10 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         scanner.addScanIterator(iterator);
       }
     }
-    
+
     /**
      * If maxVersions has been set, configure a {@link VersioningIterator} at priority 0 for this scanner.
-     * 
+     *
      * @param conf
      *          the Hadoop configuration object
      * @param scanner
@@ -739,14 +789,14 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         scanner.addScanIterator(vers);
       }
     }
-    
+
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
       initialize(inSplit, attempt.getConfiguration());
     }
-    
+
     public void initialize(InputSplit inSplit, Configuration conf) throws IOException {
       Scanner scanner;
       split = (RangeInputSplit) inSplit;
@@ -755,7 +805,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       String user = getUsername(conf);
       byte[] password = getPassword(conf);
       Authorizations authorizations = getAuthorizations(conf);
-      
+
       try {
         log.debug("Creating connector with user: " + user);
         Connector conn = instance.getConnector(user, password);
@@ -781,7 +831,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       } catch (Exception e) {
         throw new IOException(e);
       }
-      
+
       // setup a scanner within the bounds of this split
       for (Pair<Text,Text> c : getFetchedColumns(conf)) {
         if (c.getSecond() != null) {
@@ -792,63 +842,63 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
           scanner.fetchColumnFamily(c.getFirst());
         }
       }
-      
+
       scanner.setRange(split.range);
-      
+
       numKeysRead = 0;
-      
+
       // do this last after setting all scanner options
       scannerIterator = scanner.iterator();
     }
-    
+
     public void close() {}
-    
+
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
       return split.getProgress(currentKey);
     }
-    
+
     protected K currentK = null;
     protected V currentV = null;
     protected Key currentKey = null;
     protected Value currentValue = null;
-    
+
     @Override
     public K getCurrentKey() throws IOException, InterruptedException {
       return currentK;
     }
-    
+
     @Override
     public V getCurrentValue() throws IOException, InterruptedException {
       return currentV;
     }
   }
-  
+
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(Configuration conf, String tableName, List<Range> ranges) throws TableNotFoundException,
       AccumuloException, AccumuloSecurityException, IOException {
-    
+
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    
+
     Instance instance = getInstance(conf);
     Connector conn = instance.getConnector(getUsername(conf), getPassword(conf));
     String tableId = Tables.getTableId(instance, tableName);
-    
+
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
       Tables.clearCache(instance);
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
       }
     }
-    
+
     for (Range range : ranges) {
       Text startRow;
-      
+
       if (range.getStartKey() != null)
         startRow = range.getStartKey().getRow();
       else
         startRow = new Text();
-      
+
       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
       Scanner scanner = conn.createScanner(MetadataTable.NAME, Constants.NO_AUTHS);
 
@@ -857,82 +907,82 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
       scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
       scanner.setRange(metadataRange);
-      
+
       RowIterator rowIter = new RowIterator(scanner);
-      
+
       // TODO check that extents match prev extent
-      
+
       KeyExtent lastExtent = null;
-      
+
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
         KeyExtent extent = null;
         String location = null;
-        
+
         while (row.hasNext()) {
           Entry<Key,Value> entry = row.next();
           Key key = entry.getKey();
-          
+
           if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
             last = entry.getValue().toString();
           }
-          
+
           if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
               || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
             location = entry.getValue().toString();
           }
-          
+
           if (PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
-          
+
         }
-        
+
         if (location != null)
           return null;
-        
+
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-        
+
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-        
+
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
           binnedRanges.put(last, tabletRanges);
         }
-        
+
         List<Range> rangeList = tabletRanges.get(extent);
         if (rangeList == null) {
           rangeList = new ArrayList<Range>();
           tabletRanges.put(extent, rangeList);
         }
-        
+
         rangeList.add(range);
-        
+
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
           break;
         }
-        
+
         lastExtent = extent;
       }
-      
+
     }
-    
+
     return binnedRanges;
   }
-  
+
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
   public List<InputSplit> getSplits(JobContext job) throws IOException {
     return getSplits(job.getConfiguration());
   }
-  
+
   /*
    * TODO
    *  - I created the TableRange class; need to massage it into the getSplits code
@@ -940,20 +990,20 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public List<InputSplit> getSplits(Configuration conf) throws IOException {
     log.setLevel(getLogLevel(conf));
     validateOptions(conf);
-    
+
     boolean autoAdjust = getAutoAdjustRanges(conf);
     Map<String, List<Range>> tablesRanges = getRanges(conf);
     LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
-    
+
     for (Entry<String, List<Range>> tableRanges : tablesRanges.entrySet() ) {
       String tableName = tableRanges.getKey();
       List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableRanges.getValue()) : tableRanges.getValue();
-      
+
       if (ranges.isEmpty()) {
         ranges = new ArrayList<Range>(1);
         ranges.add(new Range());
       }
-      
+
       // get the metadata information for these ranges
       Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
       TabletLocator tl;
@@ -990,14 +1040,14 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       } catch (Exception e) {
         throw new IOException(e);
       }
-      
+
       HashMap<Range,ArrayList<String>> splitsToAdd = null;
-      
+
       if (!autoAdjust)
         splitsToAdd = new HashMap<Range,ArrayList<String>>();
-      
+
       HashMap<String,String> hostNameCache = new HashMap<String,String>();
-      
+
       for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
         String ip = tserverBin.getKey().split(":", 2)[0];
         String location = hostNameCache.get(ip);
@@ -1006,7 +1056,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
           location = inetAddress.getHostName();
           hostNameCache.put(ip, location);
         }
-        
+
         for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
           Range ke = extentRanges.getKey().toDataRange();
           for (Range r : extentRanges.getValue()) {
@@ -1024,14 +1074,14 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
           }
         }
       }
-      
+
       if (!autoAdjust)
         for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
           splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
     }
     return splits;
   }
-  
+
   /**
    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
    */
@@ -1039,40 +1089,40 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     private Range range;
     private String[] locations;
     private String tableName;
-    
+
     public RangeInputSplit() {
       range = new Range();
       locations = new String[0];
       tableName = "";
     }
-    
+
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       this.setRange(split.getRange());
       this.setLocations(split.getLocations());
     }
-    
+
     RangeInputSplit(String table, Range range, String[] locations) {
       this.tableName = table;
       this.range = range;
       this.locations = locations;
     }
-    
+
     public Range getRange() {
       return range;
     }
-    
+
     public void setRange(Range range) {
       this.range = range;
     }
-    
+
     public String getTableName() {
       return tableName;
     }
-    
+
     public void setTableName(String tableName) {
       this.tableName = tableName;
     }
-    
+
     private static byte[] extractBytes(ByteSequence seq, int numBytes) {
       byte[] bytes = new byte[numBytes + 1];
       bytes[0] = 0;
@@ -1084,7 +1134,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       }
       return bytes;
     }
-    
+
     public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
       int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
       BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
@@ -1092,7 +1142,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
       return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
     }
-    
+
     public float getProgress(Key currentKey) {
       if (currentKey == null)
         return 0f;
@@ -1111,9 +1161,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       // if we can't figure it out, then claim no progress
       return 0f;
     }
-    
 
-    
+
+
     /**
      * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
      */
@@ -1122,28 +1172,28 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
       int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
       long diff = 0;
-      
+
       byte[] start = startRow.getBytes();
       byte[] stop = stopRow.getBytes();
       for (int i = 0; i < maxCommon; ++i) {
         diff |= 0xff & (start[i] ^ stop[i]);
         diff <<= Byte.SIZE;
       }
-      
+
       if (startRow.getLength() != stopRow.getLength())
         diff |= 0xff;
-      
+
       return diff + 1;
     }
-    
+
     public String[] getLocations() throws IOException {
       return locations;
     }
-    
+
     public void setLocations(String[] locations) {
       this.locations = locations;
     }
-    
+
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
       int numLocs = in.readInt();
@@ -1152,7 +1202,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         locations[i] = in.readUTF();
       tableName = in.readUTF();
     }
-    
+
     public void write(DataOutput out) throws IOException {
       range.write(out);
       out.writeInt(locations.length);
@@ -1161,27 +1211,27 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       out.writeUTF(tableName);
     }
   }
-  
+
   /**
    * Pairs together a table name and a range.
    */
   static class TableRange implements Writable {
     private String tableName;
     private Range range;
-    
+
     public TableRange() {
       range = new Range();
     }
-    
+
     public TableRange(String tableName, Range range) {
       this.tableName = tableName;
       this.range = range;
     }
-    
+
     public String tableName() {
       return tableName;
     }
-    
+
     public Range range() {
       return range;
     }
@@ -1197,9 +1247,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       out.writeUTF(tableName);
       range.write(out);
     }
-    
+
   }
-  
+
   /**
    * The Class IteratorSetting. Encapsulates specifics for an Accumulo iterator's name & priority.
    */


Mime
View raw message