accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cjno...@apache.org
Subject [2/2] git commit: Squashed commit of the following:
Date Sat, 05 Oct 2013 02:28:32 GMT
Squashed commit of the following:

commit 3227a822379718d6c1297f11d7af37a716f78a60
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Tue Oct 1 23:20:34 2013 -0400

    Adding the following:
    - Deprecation to InputConfigurator, mapred.InputFormatBase, mapreduce.InputFormatBase
    - Comments to TableQueryConfig
    - Multi-table support to mapred.InputFormatBase

    ACCUMULO-391

commit 6648e8a1c97939f740b24f9368ecda9f7072cbd2
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Tue Oct 1 21:45:37 2013 -0400

    Fixing some more formatting. Adding license headers. ACCUMULO-391

commit 53bcc85689510fc988c9e9f6aff0da0cb7091c6c
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Mon Sep 30 21:01:55 2013 -0400

    Cleaning up tests. Adding test for legacy input for base + new multi-table methods. ACCUMULO-391

commit e4e05c804ea7f486290181f0246cf6b2880f5d1a
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Sun Sep 29 21:05:55 2013 -0400

    Fixing some formatting. Adding some comments. ACCUMULO-391

commit 10b4eb8206ab4395ef2d4df375b52a7ffe77d655
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Sun Sep 29 20:37:07 2013 -0400

    ACCUMULO-1732 Using table id in RangeInputSplit so that it can be resolved back to "working" table name in mappers. Scanner uses the "working" table name while everything else can still safely use the original configured table name.

commit 7b8585f0333c09674f7612b4dc24887f684413fe
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Sat Sep 28 23:23:48 2013 -0400

    Removing deprecation for now until we have some discussions. Updating/adding comments. ACCUMULO-391

commit 273ee49530de28c2c5dfe39c80ab0c90c3c3a95f
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Sat Sep 28 23:01:04 2013 -0400

    The legacy mapred InputFormatBase now verifies (and fixes the scanner for) a possible change in table name that could happen between the configuration of the map/reduce job and the actual processing of the scanner for a specific split. In that case, the most recent table name associated with the id is always used for the scanner (though the table name that was expected during job setup is still used in the RangeInputSplit). ACCUMULO-391

commit e6a7c962f707487d832ba4b16c1f9066d13ff8f1
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Sat Sep 28 22:53:42 2013 -0400

    The original single-table setters/getters now populate a "default" TableQueryConfig object under the hood. This should make the switch over much easier. Deprecated single table methods in light of the API changes for the new configuration object. ACCUMULO-391

commit fdf4cadb16c29fc03a610cf83399ee26d7f83bc9
Author: Corey J. Nolet <cjnolet@gmail.com>
Date:   Sat Sep 28 21:58:40 2013 -0400

    Adding new TableQueryConfig object for setting multiple table info in the InputFormatBase


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b96701f2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b96701f2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b96701f2

Branch: refs/heads/master
Commit: b96701f220ecb3e891a71741179b867429fa1d39
Parents: 7da1164
Author: Corey J. Nolet <cjnolet@gmail.com>
Authored: Wed Oct 2 20:43:43 2013 -0400
Committer: Corey J. Nolet <cjnolet@gmail.com>
Committed: Wed Oct 2 20:43:43 2013 -0400

----------------------------------------------------------------------
 .../core/client/mapred/AccumuloInputFormat.java |   1 +
 .../core/client/mapred/InputFormatBase.java     | 440 ++++++++++-------
 .../client/mapreduce/AccumuloInputFormat.java   |   2 +-
 .../core/client/mapreduce/InputFormatBase.java  | 475 ++++++++++++-------
 .../mapreduce/lib/util/InputConfigurator.java   | 346 ++++++++++----
 .../accumulo/core/conf/TableQueryConfig.java    | 369 ++++++++++++++
 .../accumulo/core/util/ArgumentChecker.java     |   5 +
 .../client/mapred/AccumuloInputFormatTest.java  |  86 +++-
 .../mapreduce/AccumuloInputFormatTest.java      |  89 +++-
 .../core/conf/TableQueryConfigTest.java         |  91 ++++
 10 files changed, 1452 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
index bbbd0c3..b3b9fc7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.Reporter;
  * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)}
  * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)}
  * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)}
+ * <li>{@link AccumuloInputFormat#setTableQueryConfigs(JobConf, org.apache.accumulo.core.conf.TableQueryConfig...)}</li>
  * </ul>
  * 
  * Other static methods are optional.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
index c796cd2..ae51581 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -47,6 +48,7 @@ import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.PartialKey;
@@ -68,6 +70,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
  * <p>
@@ -79,10 +83,10 @@ import org.apache.log4j.Logger;
  * See {@link AccumuloInputFormat} for an example implementation.
  */
 public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
-
+  
   private static final Class<?> CLASS = AccumuloInputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
-
+  
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -102,7 +106,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
-
+  
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -121,7 +125,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
   }
-
+  
   /**
    * Determines if the connector has been configured.
    * 
@@ -134,7 +138,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Boolean isConnectorInfoSet(JobConf job) {
     return InputConfigurator.isConnectorInfoSet(CLASS, job);
   }
-
+  
   /**
    * Gets the user name from the configuration.
    * 
@@ -147,7 +151,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getPrincipal(JobConf job) {
     return InputConfigurator.getPrincipal(CLASS, job);
   }
-
+  
   /**
    * Gets the serialized token class from either the configuration or the token file.
    * 
@@ -158,7 +162,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getTokenClass(JobConf job) {
     return getAuthenticationToken(job).getClass().getName();
   }
-
+  
   /**
    * Gets the serialized token from either the configuration or the token file.
    * 
@@ -169,7 +173,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static byte[] getToken(JobConf job) {
     return AuthenticationTokenSerializer.serialize(getAuthenticationToken(job));
   }
-
+  
   /**
    * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured.
    * 
@@ -183,7 +187,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static AuthenticationToken getAuthenticationToken(JobConf job) {
     return InputConfigurator.getAuthenticationToken(CLASS, job);
   }
-
+  
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
    * 
@@ -198,7 +202,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
     InputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers);
   }
-
+  
   /**
    * Configures a {@link MockInstance} for this job.
    * 
@@ -211,7 +215,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setMockInstance(JobConf job, String instanceName) {
     InputConfigurator.setMockInstance(CLASS, job, instanceName);
   }
-
+  
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
@@ -225,7 +229,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Instance getInstance(JobConf job) {
     return InputConfigurator.getInstance(CLASS, job);
   }
-
+  
   /**
    * Sets the log level for this job.
    * 
@@ -238,7 +242,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setLogLevel(JobConf job, Level level) {
     InputConfigurator.setLogLevel(CLASS, job, level);
   }
-
+  
   /**
    * Gets the log level from this configuration.
    * 
@@ -251,7 +255,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Level getLogLevel(JobConf job) {
     return InputConfigurator.getLogLevel(CLASS, job);
   }
-
+  
   /**
    * Sets the name of the input table, over which this job will scan.
    * 
@@ -260,11 +264,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param tableName
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setInputTableName(JobConf job, String tableName) {
     InputConfigurator.setInputTableName(CLASS, job, tableName);
   }
-
+  
   /**
    * Gets the table name from the configuration.
    * 
@@ -273,11 +279,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return the table name
    * @since 1.5.0
    * @see #setInputTableName(JobConf, String)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static String getInputTableName(JobConf job) {
     return InputConfigurator.getInputTableName(CLASS, job);
   }
-
+  
   /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
@@ -290,7 +298,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setScanAuthorizations(JobConf job, Authorizations auths) {
     InputConfigurator.setScanAuthorizations(CLASS, job, auths);
   }
-
+  
   /**
    * Gets the authorizations to set for the scans from the configuration.
    * 
@@ -303,7 +311,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Authorizations getScanAuthorizations(JobConf job) {
     return InputConfigurator.getScanAuthorizations(CLASS, job);
   }
-
+  
   /**
    * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
    * 
@@ -312,11 +320,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param ranges
    *          the ranges that will be mapped over
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setRanges(JobConf job, Collection<Range> ranges) {
     InputConfigurator.setRanges(CLASS, job, ranges);
   }
-
+  
   /**
    * Gets the ranges to scan over from a job.
    * 
@@ -327,11 +337,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *           if the ranges have been encoded improperly
    * @since 1.5.0
    * @see #setRanges(JobConf, Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static List<Range> getRanges(JobConf job) throws IOException {
     return InputConfigurator.getRanges(CLASS, job);
   }
-
+  
   /**
    * Restricts the columns that will be mapped over for this job.
    * 
@@ -341,11 +353,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void fetchColumns(JobConf job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job, columnFamilyColumnQualifierPairs);
   }
-
+  
   /**
    * Gets the columns to be mapped over from this job.
    * 
@@ -354,11 +368,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return a set of columns
    * @since 1.5.0
    * @see #fetchColumns(JobConf, Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobConf job) {
     return InputConfigurator.getFetchedColumns(CLASS, job);
   }
-
+  
   /**
    * Encode an iterator on the input for this job.
    * 
@@ -367,11 +383,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param cfg
    *          the configuration of the iterator
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void addIterator(JobConf job, IteratorSetting cfg) {
     InputConfigurator.addIterator(CLASS, job, cfg);
   }
-
+  
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
    * 
@@ -380,11 +398,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return a list of iterators
    * @since 1.5.0
    * @see #addIterator(JobConf, IteratorSetting)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static List<IteratorSetting> getIterators(JobConf job) {
     return InputConfigurator.getIterators(CLASS, job);
   }
-
+  
   /**
    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
@@ -398,11 +418,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *          the feature is enabled if true, disabled otherwise
    * @see #setRanges(JobConf, Collection)
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setAutoAdjustRanges(JobConf job, boolean enableFeature) {
     InputConfigurator.setAutoAdjustRanges(CLASS, job, enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has auto-adjust ranges enabled.
    * 
@@ -411,11 +433,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return false if the feature is disabled, true otherwise
    * @since 1.5.0
    * @see #setAutoAdjustRanges(JobConf, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean getAutoAdjustRanges(JobConf job) {
     return InputConfigurator.getAutoAdjustRanges(CLASS, job);
   }
-
+  
   /**
    * Controls the use of the {@link IsolatedScanner} in this job.
    * 
@@ -427,11 +451,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setScanIsolation(JobConf job, boolean enableFeature) {
     InputConfigurator.setScanIsolation(CLASS, job, enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has isolation enabled.
    * 
@@ -440,11 +466,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setScanIsolation(JobConf, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean isIsolated(JobConf job) {
     return InputConfigurator.isIsolated(CLASS, job);
   }
-
+  
   /**
    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
@@ -457,11 +485,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setLocalIterators(JobConf job, boolean enableFeature) {
     InputConfigurator.setLocalIterators(CLASS, job, enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration uses local iterators.
    * 
@@ -470,11 +500,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setLocalIterators(JobConf, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean usesLocalIterators(JobConf job) {
     return InputConfigurator.usesLocalIterators(CLASS, job);
   }
-
+  
   /**
    * <p>
    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
@@ -505,11 +537,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setOfflineTableScan(JobConf job, boolean enableFeature) {
     InputConfigurator.setOfflineTableScan(CLASS, job, enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has the offline table scan feature enabled.
    * 
@@ -518,11 +552,61 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setOfflineTableScan(JobConf, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean isOfflineScan(JobConf job) {
     return InputConfigurator.isOfflineScan(CLASS, job);
   }
-
+  
+  /**
+   * Sets the {@link TableQueryConfig} objects on the given Hadoop configuration
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param configs
+   *          the table query configs to be set on the configuration.
+   * @since 1.6.0
+   */
+  public static void setTableQueryConfigs(JobConf job, TableQueryConfig... configs) {
+    checkNotNull(configs);
+    InputConfigurator.setTableQueryConfigs(CLASS, job, configs);
+  }
+  
+  /**
+   * Fetches all {@link TableQueryConfig}s that have been set on the given Hadoop configuration.
+   * 
+   * <p>
+   * Note this also returns the {@link TableQueryConfig} representing the table configurations set through the single table input methods (
+   * {@link #setInputTableName(JobConf, String)}, {@link #setRanges(JobConf, java.util.Collection)}, {@link #fetchColumns(JobConf, java.util.Collection)},
+   * {@link #addIterator(JobConf, IteratorSetting)}, etc...)
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @return
+   * @since 1.6.0
+   */
+  public static List<TableQueryConfig> getTableQueryConfigs(JobConf job) {
+    return InputConfigurator.getTableQueryConfigs(CLASS, job);
+  }
+  
+  /**
+   * Fetches a {@link TableQueryConfig} that has been set on the configuration for a specific table.
+   * 
+   * <p>
+   * null is returned in the event that the table doesn't exist.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableName
+   *          the table name for which to grab the config object
+   * @return the {@link TableQueryConfig} for the given table
+   * @since 1.6.0
+   */
+  public static TableQueryConfig getTableQueryConfig(JobConf job, String tableName) {
+    return InputConfigurator.getTableQueryConfig(CLASS, job, tableName);
+  }
+  
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
    * 
@@ -533,13 +617,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobConf job) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, job);
+  protected static TabletLocator getTabletLocator(JobConf job, String tableName) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, job, tableName);
   }
-
+  
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
-   * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
+   * Check whether a configuration is fully configured to be used with an Accumulo {@link InputFormat}.
    * 
    * @param job
    *          the Hadoop context for the configured job
@@ -550,7 +634,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static void validateOptions(JobConf job) throws IOException {
     InputConfigurator.validateOptions(CLASS, job);
   }
-
+  
   /**
    * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
    * types.
@@ -565,7 +649,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
-
+    
     /**
      * Apply the configured iterators from the configuration to the scanner.
      * 
@@ -574,13 +658,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
      * @param scanner
      *          the scanner to configure
      */
-    protected void setupIterators(JobConf job, Scanner scanner) {
-      List<IteratorSetting> iterators = getIterators(job);
-      for (IteratorSetting iterator : iterators) {
+    protected void setupIterators(JobConf job, Scanner scanner, String tableName) {
+      TableQueryConfig config = getTableQueryConfig(job, tableName);
+      List<IteratorSetting> iterators = config.getIterators();
+      for (IteratorSetting iterator : iterators)
         scanner.addScanIterator(iterator);
-      }
     }
-
+    
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
@@ -592,32 +676,47 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       String user = getPrincipal(job);
       AuthenticationToken token = getAuthenticationToken(job);
       Authorizations authorizations = getScanAuthorizations(job);
-
+      
+      TableQueryConfig tableConfig = getTableQueryConfig(job, split.getTableName());
+      
+      // in case the table name changed, we can still use the previous name for terms of configuration,
+      // but for the scanner, we'll need to reference the new table name.
+      String actualNameForId = split.getTableName();
+      if (!(instance instanceof MockInstance)) {
+        try {
+          actualNameForId = Tables.getTableName(instance, split.getTableId());
+          if (!actualNameForId.equals(split.getTableName()))
+            log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
+        } catch (TableNotFoundException e) {
+          throw new IOException("The specified table was not found for id=" + split.getTableId());
+        }
+      }
+      
       try {
         log.debug("Creating connector with user: " + user);
         Connector conn = instance.getConnector(user, token);
         log.debug("Creating scanner for table: " + getInputTableName(job));
         log.debug("Authorizations are: " + authorizations);
-        if (isOfflineScan(job)) {
-          scanner = new OfflineScanner(instance, new Credentials(user, token), Tables.getTableId(instance, getInputTableName(job)), authorizations);
+        if (tableConfig.isOfflineScan()) {
+          scanner = new OfflineScanner(instance, new Credentials(user, token), split.getTableId(), authorizations);
         } else {
-          scanner = conn.createScanner(getInputTableName(job), authorizations);
+          scanner = conn.createScanner(actualNameForId, authorizations);
         }
-        if (isIsolated(job)) {
+        if (tableConfig.shouldUseIsolatedScanners()) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
         }
-        if (usesLocalIterators(job)) {
+        if (tableConfig.shouldUseLocalIterators()) {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupIterators(job, scanner);
+        setupIterators(job, scanner, split.getTableName());
       } catch (Exception e) {
         throw new IOException(e);
       }
-
+      
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(job)) {
+      for (Pair<Text,Text> c : tableConfig.getFetchedColumns()) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -626,58 +725,58 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
           scanner.fetchColumnFamily(c.getFirst());
         }
       }
-
+      
       scanner.setRange(split.getRange());
-
+      
       numKeysRead = 0;
-
+      
       // do this last after setting all scanner options
       scannerIterator = scanner.iterator();
     }
-
+    
     @Override
     public void close() {}
-
+    
     @Override
     public long getPos() throws IOException {
       return numKeysRead;
     }
-
+    
     @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
       return split.getProgress(currentKey);
     }
-
+    
     protected Key currentKey = null;
-
+    
   }
-
+  
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException,
       AccumuloSecurityException {
-
+    
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
+    
     Instance instance = getInstance(job);
     Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job));
     String tableId = Tables.getTableId(instance, tableName);
-
+    
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
       Tables.clearCache(instance);
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
       }
     }
-
+    
     for (Range range : ranges) {
       Text startRow;
-
+      
       if (range.getStartKey() != null)
         startRow = range.getStartKey().getRow();
       else
         startRow = new Text();
-
+      
       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
       Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
@@ -685,73 +784,73 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
       scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
       scanner.setRange(metadataRange);
-
+      
       RowIterator rowIter = new RowIterator(scanner);
-
+      
       KeyExtent lastExtent = null;
-
+      
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
         KeyExtent extent = null;
         String location = null;
-
+        
         while (row.hasNext()) {
           Entry<Key,Value> entry = row.next();
           Key key = entry.getKey();
-
+          
           if (key.getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
             last = entry.getValue().toString();
           }
-
+          
           if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
               || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
             location = entry.getValue().toString();
           }
-
+          
           if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
-
+          
         }
-
+        
         if (location != null)
           return null;
-
+        
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-
+        
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-
+        
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
           binnedRanges.put(last, tabletRanges);
         }
-
+        
         List<Range> rangeList = tabletRanges.get(extent);
         if (rangeList == null) {
           rangeList = new ArrayList<Range>();
           tabletRanges.put(extent, rangeList);
         }
-
+        
         rangeList.add(range);
-
+        
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
           break;
         }
-
+        
         lastExtent = extent;
       }
-
+      
     }
-
+    
     return binnedRanges;
   }
-
+  
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
@@ -759,110 +858,111 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     log.setLevel(getLogLevel(job));
     validateOptions(job);
-
-    String tableName = getInputTableName(job);
-    boolean autoAdjust = getAutoAdjustRanges(job);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job);
-
-    if (ranges.isEmpty()) {
-      ranges = new ArrayList<Range>(1);
-      ranges.add(new Range());
-    }
-
-    // get the metadata information for these ranges
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl;
-    try {
-      if (isOfflineScan(job)) {
-        binnedRanges = binOfflineTable(job, tableName, ranges);
-        while (binnedRanges == null) {
-          // Some tablets were still online, try again
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(job, tableName, ranges);
-        }
-      } else {
-        Instance instance = getInstance(job);
-        String tableId = null;
-        tl = getTabletLocator(job);
-        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
-        tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(job), getAuthenticationToken(job)), ranges, binnedRanges).isEmpty()) {
-          if (!(instance instanceof MockInstance)) {
-            if (tableId == null)
-              tableId = Tables.getTableId(instance, tableName);
-            if (!Tables.exists(instance, tableId))
-              throw new TableDeletedException(tableId);
-            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-              throw new TableOfflineException(instance, tableId);
+    
+    LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
+    List<TableQueryConfig> tableConfigs = getTableQueryConfigs(job);
+    for (TableQueryConfig tableConfig : tableConfigs) {
+      
+      boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+      String tableId = null;
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<Range>(1);
+        ranges.add(new Range());
+      }
+      
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      TabletLocator tl;
+      try {
+        if (tableConfig.isOfflineScan()) {
+          binnedRanges = binOfflineTable(job, tableConfig.getTableName(), ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            binnedRanges = binOfflineTable(job, tableConfig.getTableName(), ranges);
           }
-          binnedRanges.clear();
-          log.warn("Unable to locate bins for specified ranges. Retrying.");
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+        } else {
+          Instance instance = getInstance(job);
+          tl = getTabletLocator(job, tableConfig.getTableName());
+          // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
           tl.invalidateCache();
+          Credentials creds = new Credentials(getPrincipal(job), getAuthenticationToken(job));
+          
+          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+            if (!(instance instanceof MockInstance)) {
+              if (!Tables.exists(instance, tableId))
+                throw new TableDeletedException(tableId);
+              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, tableId);
+              tableId = Tables.getTableId(instance, tableConfig.getTableName());
+            }
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            tl.invalidateCache();
+          }
         }
+      } catch (Exception e) {
+        throw new IOException(e);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
-    HashMap<Range,ArrayList<String>> splitsToAdd = null;
-
-    if (!autoAdjust)
-      splitsToAdd = new HashMap<Range,ArrayList<String>>();
-
-    HashMap<String,String> hostNameCache = new HashMap<String,String>();
-
-    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-      String ip = tserverBin.getKey().split(":", 2)[0];
-      String location = hostNameCache.get(ip);
-      if (location == null) {
-        InetAddress inetAddress = InetAddress.getByName(ip);
-        location = inetAddress.getHostName();
-        hostNameCache.put(ip, location);
-      }
-
-      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-        Range ke = extentRanges.getKey().toDataRange();
-        for (Range r : extentRanges.getValue()) {
-          if (autoAdjust) {
-            // divide ranges into smaller ranges, based on the tablets
-            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
-          } else {
-            // don't divide ranges
-            ArrayList<String> locations = splitsToAdd.get(r);
-            if (locations == null)
-              locations = new ArrayList<String>(1);
-            locations.add(location);
-            splitsToAdd.put(r, locations);
+      
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+      
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<Range,ArrayList<String>>();
+      
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+      for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+        for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          for (Range r : extentRanges.getValue()) {
+            if (autoAdjust) {
+              // divide ranges into smaller ranges, based on the tablets
+              splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, ke.clip(r), new String[] {location}));
+            } else {
+              // don't divide ranges
+              ArrayList<String> locations = splitsToAdd.get(r);
+              if (locations == null)
+                locations = new ArrayList<String>(1);
+              locations.add(location);
+              splitsToAdd.put(r, locations);
+            }
           }
         }
       }
+      
+      if (!autoAdjust)
+        for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
+          splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, entry.getKey(), entry.getValue().toArray(new String[0])));
     }
-
-    if (!autoAdjust)
-      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
-        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
+    
     return splits.toArray(new InputSplit[splits.size()]);
   }
-
+  
   /**
    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
    */
   public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit implements InputSplit {
-
+    
     public RangeInputSplit() {
       super();
     }
-
+    
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       super(split);
     }
-
-    protected RangeInputSplit(String table, Range range, String[] locations) {
-      super(table, range, locations);
+    
+    protected RangeInputSplit(String table, String tableId, Range range, String[] locations) {
+      super(table, tableId, range, locations);
     }
-
   }
-
+  
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
index 1cbb606..44dfc33 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
@@ -39,9 +39,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  * <ul>
  * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
  * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, String)}
- * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)}
  * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
  * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)}
+ * <li>{@link AccumuloInputFormat#setTableQueryConfigs(Job, org.apache.accumulo.core.conf.TableQueryConfig...)} 
  * </ul>
  * 
  * Other static methods are optional.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 13f9708..acff7e2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -16,6 +16,9 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -26,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -50,7 +54,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -157,6 +161,21 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   }
 
   /**
+   * Gets the table name from the configuration.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the table name
+   * @since 1.5.0
+   * @see #setInputTableName(Job, String)
+   * @deprecated since 1.6.0
+   */
+  @Deprecated
+  protected static String getInputTableName(JobContext context) {
+    return InputConfigurator.getInputTableName(CLASS, getConfiguration(context));
+  }
+
+  /**
    * Gets the serialized token class from either the configuration or the token file.
    * 
    * @since 1.5.0
@@ -268,25 +287,14 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param tableName
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setInputTableName(Job job, String tableName) {
     InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
   }
 
   /**
-   * Gets the table name from the configuration.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the table name
-   * @since 1.5.0
-   * @see #setInputTableName(Job, String)
-   */
-  protected static String getInputTableName(JobContext context) {
-    return InputConfigurator.getInputTableName(CLASS, getConfiguration(context));
-  }
-
-  /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
    * @param job
@@ -313,14 +321,16 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   }
 
   /**
-   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   * Sets the input ranges to scan for all tables associated with this job.
    * 
    * @param job
    *          the Hadoop job instance to be configured
    * @param ranges
    *          the ranges that will be mapped over
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setRanges(Job job, Collection<Range> ranges) {
     InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
   }
@@ -331,17 +341,18 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param context
    *          the Hadoop context for the configured job
    * @return the ranges
-   * @throws IOException
-   *           if the ranges have been encoded improperly
    * @since 1.5.0
    * @see #setRanges(Job, Collection)
+   * @see #setRanges(org.apache.hadoop.mapreduce.Job, java.util.Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static List<Range> getRanges(JobContext context) throws IOException {
     return InputConfigurator.getRanges(CLASS, getConfiguration(context));
   }
 
   /**
-   * Restricts the columns that will be mapped over for this job.
+   * Restricts the columns that will be mapped over for this job for the default input table.
    * 
    * @param job
    *          the Hadoop job instance to be configured
@@ -349,7 +360,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
   }
@@ -362,20 +375,24 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return a set of columns
    * @since 1.5.0
    * @see #fetchColumns(Job, Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
     return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context));
   }
 
   /**
-   * Encode an iterator on the input for this job.
+   * Encode an iterator on the default all tables for this job.
    * 
    * @param job
    *          the Hadoop job instance to be configured
    * @param cfg
    *          the configuration of the iterator
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void addIterator(Job job, IteratorSetting cfg) {
     InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
   }
@@ -388,7 +405,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return a list of iterators
    * @since 1.5.0
    * @see #addIterator(Job, IteratorSetting)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static List<IteratorSetting> getIterators(JobContext context) {
     return InputConfigurator.getIterators(CLASS, getConfiguration(context));
   }
@@ -406,7 +425,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the feature is enabled if true, disabled otherwise
    * @see #setRanges(Job, Collection)
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
     InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -419,12 +440,63 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return false if the feature is disabled, true otherwise
    * @since 1.5.0
    * @see #setAutoAdjustRanges(Job, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean getAutoAdjustRanges(JobContext context) {
     return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context));
   }
 
   /**
+   * Sets the {@link TableQueryConfig} objects on the given Hadoop configuration
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param configs
+   *          the table query configs to be set on the configuration.
+   * @since 1.6.0
+   */
+  public static void setTableQueryConfigs(Job job, TableQueryConfig... configs) {
+    checkNotNull(configs);
+    InputConfigurator.setTableQueryConfigs(CLASS, getConfiguration(job), configs);
+  }
+
+  /**
+   * Fetches all {@link TableQueryConfig}s that have been set on the given Hadoop configuration.
+   * 
+   * <p>
+   * Note this also returns the {@link TableQueryConfig} representing the table configurations set through the single table input methods like
+   * {@link #setInputTableName(org.apache.hadoop.mapreduce.Job, String)}, {@link #setRanges(org.apache.hadoop.mapreduce.Job, java.util.Collection)},
+   * {@link #fetchColumns(org.apache.hadoop.mapreduce.Job, java.util.Collection)},
+   * {@link #addIterator(org.apache.hadoop.mapreduce.Job, org.apache.accumulo.core.client.IteratorSetting)}, etc...)
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @return
+   * @since 1.6.0
+   */
+  public static List<TableQueryConfig> getTableQueryConfigs(JobContext job) {
+    return InputConfigurator.getTableQueryConfigs(CLASS, getConfiguration(job));
+  }
+
+  /**
+   * Fetches a {@link TableQueryConfig} that has been set on the configuration for a specific table.
+   * 
+   * <p>
+   * null is returned in the event that the table doesn't exist.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableName
+   *          the table name for which to grab the config object
+   * @return the {@link TableQueryConfig} for the given table
+   * @since 1.6.0
+   */
+  public static TableQueryConfig getTableQueryConfig(JobContext job, String tableName) {
+    return InputConfigurator.getTableQueryConfig(CLASS,getConfiguration(job),tableName);
+  }
+
+  /**
    * Controls the use of the {@link IsolatedScanner} in this job.
    * 
    * <p>
@@ -435,7 +507,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setScanIsolation(Job job, boolean enableFeature) {
     InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -448,7 +522,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setScanIsolation(Job, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean isIsolated(JobContext context) {
     return InputConfigurator.isIsolated(CLASS, getConfiguration(context));
   }
@@ -465,7 +541,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setLocalIterators(Job job, boolean enableFeature) {
     InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -478,7 +556,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setLocalIterators(Job, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean usesLocalIterators(JobContext context) {
     return InputConfigurator.usesLocalIterators(CLASS, getConfiguration(context));
   }
@@ -513,7 +593,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setOfflineTableScan(Job job, boolean enableFeature) {
     InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -526,7 +608,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setOfflineTableScan(Job, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean isOfflineScan(JobContext context) {
     return InputConfigurator.isOfflineScan(CLASS, getConfiguration(context));
   }
@@ -536,13 +620,15 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * 
    * @param context
    *          the Hadoop context for the configured job
+   * @param table
+   *          the table for which to initialize the locator
    * @return an Accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context));
+  protected static TabletLocator getTabletLocator(JobContext context, String table) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), table);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -575,60 +661,79 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
-
+  
     /**
-     * Apply the configured iterators from the configuration to the scanner.
+     * Apply the configured iterators from the configuration to the scanner. This applies both the default iterators and the per-table iterators.
      * 
      * @param context
      *          the Hadoop context for the configured job
      * @param scanner
      *          the scanner to configure
+     * @param tableName
+     *          the table name for which to set up the iterators
      */
-    protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
-      List<IteratorSetting> iterators = getIterators(context);
-      for (IteratorSetting iterator : iterators) {
+    protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName) {
+      TableQueryConfig config = getTableQueryConfig(context, tableName);
+      List<IteratorSetting> iterators = config.getIterators();
+      for (IteratorSetting iterator : iterators)
         scanner.addScanIterator(iterator);
-      }
     }
-
+  
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
     @Override
     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
+    
       Scanner scanner;
       split = (RangeInputSplit) inSplit;
-      log.debug("Initializing input split: " + split.range);
+      log.debug("Initializing input split: " + split.getRange());
       Instance instance = getInstance(attempt);
       String principal = getPrincipal(attempt);
+    
+      TableQueryConfig tableConfig = getTableQueryConfig(attempt, split.getTableName());
+    
+      // in case the table name changed, we can still use the previous name for terms of configuration,
+      // but for the scanner, we'll need to reference the new table name.
+      String actualNameForId = split.getTableName();
+      if (!(instance instanceof MockInstance)) {
+        try {
+          actualNameForId = Tables.getTableName(instance, split.getTableId());
+          if (!actualNameForId.equals(split.getTableName()))
+            log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
+        } catch (TableNotFoundException e) {
+          throw new IOException("The specified table was not found for id=" + split.getTableId());
+        }
+      }
+    
       AuthenticationToken token = getAuthenticationToken(attempt);
       Authorizations authorizations = getScanAuthorizations(attempt);
-
       try {
         log.debug("Creating connector with user: " + principal);
+      
         Connector conn = instance.getConnector(principal, token);
-        log.debug("Creating scanner for table: " + getInputTableName(attempt));
+        log.debug("Creating scanner for table: " + split.getTableName());
         log.debug("Authorizations are: " + authorizations);
-        if (isOfflineScan(attempt)) {
-          scanner = new OfflineScanner(instance, new Credentials(principal, token), Tables.getTableId(instance, getInputTableName(attempt)), authorizations);
+        if (tableConfig.isOfflineScan()) {
+          scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations);
         } else {
-          scanner = conn.createScanner(getInputTableName(attempt), authorizations);
+          scanner = conn.createScanner(actualNameForId, authorizations);
         }
-        if (isIsolated(attempt)) {
+        if (tableConfig.shouldUseIsolatedScanners()) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
         }
-        if (usesLocalIterators(attempt)) {
+        if (tableConfig.shouldUseLocalIterators()) {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupIterators(attempt, scanner);
+        setupIterators(attempt, scanner, split.getTableName());
       } catch (Exception e) {
         throw new IOException(e);
       }
-
+    
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
+      for (Pair<Text,Text> c : tableConfig.getFetchedColumns()) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -637,35 +742,34 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
           scanner.fetchColumnFamily(c.getFirst());
         }
       }
-
-      scanner.setRange(split.range);
-
+    
+      scanner.setRange(split.getRange());
       numKeysRead = 0;
-
+    
       // do this last after setting all scanner options
       scannerIterator = scanner.iterator();
     }
-
+  
     @Override
     public void close() {}
-
+  
     @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
       return split.getProgress(currentKey);
     }
-
+  
     protected K currentK = null;
     protected V currentV = null;
     protected Key currentKey = null;
     protected Value currentValue = null;
-
+  
     @Override
     public K getCurrentKey() throws IOException, InterruptedException {
       return currentK;
     }
-
+  
     @Override
     public V getCurrentValue() throws IOException, InterruptedException {
       return currentV;
@@ -674,28 +778,28 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
 
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
       AccumuloException, AccumuloSecurityException {
-
+  
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
+  
     Instance instance = getInstance(context);
     Connector conn = instance.getConnector(getPrincipal(context), getAuthenticationToken(context));
     String tableId = Tables.getTableId(instance, tableName);
-
+  
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
       Tables.clearCache(instance);
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
       }
     }
-
+  
     for (Range range : ranges) {
       Text startRow;
-
+    
       if (range.getStartKey() != null)
         startRow = range.getStartKey().getRow();
       else
         startRow = new Text();
-
+    
       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
       Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
@@ -703,164 +807,169 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
       scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
       scanner.setRange(metadataRange);
-
+    
       RowIterator rowIter = new RowIterator(scanner);
-
       KeyExtent lastExtent = null;
-
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
         KeyExtent extent = null;
         String location = null;
-
+      
         while (row.hasNext()) {
           Entry<Key,Value> entry = row.next();
           Key key = entry.getKey();
-
+        
           if (key.getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
             last = entry.getValue().toString();
           }
-
+        
           if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
               || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
             location = entry.getValue().toString();
           }
-
+        
           if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
-
+        
         }
-
+      
         if (location != null)
           return null;
-
+      
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-
+      
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-
+      
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
           binnedRanges.put(last, tabletRanges);
         }
-
+      
         List<Range> rangeList = tabletRanges.get(extent);
         if (rangeList == null) {
           rangeList = new ArrayList<Range>();
           tabletRanges.put(extent, rangeList);
         }
-
+      
         rangeList.add(range);
-
+      
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
           break;
         }
-
+      
         lastExtent = extent;
       }
-
+    
     }
-
+  
     return binnedRanges;
   }
 
   /**
-   * Read the metadata table to get tablets and match up ranges to them.
+   * Gets the splits of the tables that have been set on the job.
+   * 
+   * @param conf
+   *          the configuration of the job
+   * @return the splits from the tables based on the ranges.
+   * @throws IOException
+   *           if a table set on the job doesn't exist or an error occurs initializing the tablet locator
    */
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    log.setLevel(getLogLevel(context));
-    validateOptions(context);
-
-    String tableName = getInputTableName(context);
-    boolean autoAdjust = getAutoAdjustRanges(context);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
-
-    if (ranges.isEmpty()) {
-      ranges = new ArrayList<Range>(1);
-      ranges.add(new Range());
-    }
-
-    // get the metadata information for these ranges
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl;
-    try {
-      if (isOfflineScan(context)) {
-        binnedRanges = binOfflineTable(context, tableName, ranges);
-        while (binnedRanges == null) {
-          // Some tablets were still online, try again
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(context, tableName, ranges);
-        }
-      } else {
-        Instance instance = getInstance(context);
-        String tableId = null;
-        tl = getTabletLocator(context);
-        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
-        tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(context), getAuthenticationToken(context)), ranges, binnedRanges).isEmpty()) {
-          if (!(instance instanceof MockInstance)) {
-            if (tableId == null)
-              tableId = Tables.getTableId(instance, tableName);
-            if (!Tables.exists(instance, tableId))
-              throw new TableDeletedException(tableId);
-            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-              throw new TableOfflineException(instance, tableId);
+  public List<InputSplit> getSplits(JobContext conf) throws IOException {
+    log.setLevel(getLogLevel(conf));
+    validateOptions(conf);
+  
+    LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
+    List<TableQueryConfig> tableConfigs = getTableQueryConfigs(conf);
+    for (TableQueryConfig tableConfig : tableConfigs) {
+    
+      boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+      String tableId = null;
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<Range>(1);
+        ranges.add(new Range());
+      }
+    
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      TabletLocator tl;
+      try {
+        if (tableConfig.isOfflineScan()) {
+          binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges);
+          
           }
-          binnedRanges.clear();
-          log.warn("Unable to locate bins for specified ranges. Retrying.");
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+        } else {
+          Instance instance = getInstance(conf);
+          tl = getTabletLocator(conf, tableConfig.getTableName());
+          // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
           tl.invalidateCache();
+          Credentials creds = new Credentials(getPrincipal(conf), getAuthenticationToken(conf));
+        
+          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+            if (!(instance instanceof MockInstance)) {
+              if (!Tables.exists(instance, tableId))
+                throw new TableDeletedException(tableId);
+              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, tableId);
+              tableId = Tables.getTableId(instance, tableConfig.getTableName());
+            }
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            tl.invalidateCache();
+          }
         }
+      } catch (Exception e) {
+        throw new IOException(e);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
-    HashMap<Range,ArrayList<String>> splitsToAdd = null;
-
-    if (!autoAdjust)
-      splitsToAdd = new HashMap<Range,ArrayList<String>>();
-
-    HashMap<String,String> hostNameCache = new HashMap<String,String>();
-
-    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-      String ip = tserverBin.getKey().split(":", 2)[0];
-      String location = hostNameCache.get(ip);
-      if (location == null) {
-        InetAddress inetAddress = InetAddress.getByName(ip);
-        location = inetAddress.getHostName();
-        hostNameCache.put(ip, location);
-      }
-
-      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-        Range ke = extentRanges.getKey().toDataRange();
-        for (Range r : extentRanges.getValue()) {
-          if (autoAdjust) {
-            // divide ranges into smaller ranges, based on the tablets
-            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
-          } else {
-            // don't divide ranges
-            ArrayList<String> locations = splitsToAdd.get(r);
-            if (locations == null)
-              locations = new ArrayList<String>(1);
-            locations.add(location);
-            splitsToAdd.put(r, locations);
+    
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+    
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<Range,ArrayList<String>>();
+    
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+      for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+        for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          for (Range r : extentRanges.getValue()) {
+            if (autoAdjust) {
+              // divide ranges into smaller ranges, based on the tablets
+              splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, ke.clip(r), new String[] {location}));
+            } else {
+              // don't divide ranges
+              ArrayList<String> locations = splitsToAdd.get(r);
+              if (locations == null)
+                locations = new ArrayList<String>(1);
+              locations.add(location);
+              splitsToAdd.put(r, locations);
+            }
           }
         }
       }
+    
+      if (!autoAdjust)
+        for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
+          splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, entry.getKey(), entry.getValue().toArray(new String[0])));
     }
-
-    if (!autoAdjust)
-      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
-        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
     return splits;
   }
 
@@ -870,30 +979,53 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static class RangeInputSplit extends InputSplit implements Writable {
     private Range range;
     private String[] locations;
-
+    private String tableId;
+    private String tableName;
+  
     public RangeInputSplit() {
       range = new Range();
       locations = new String[0];
+      tableId = "";
+      tableName = "";
     }
-
+  
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       this.setRange(split.getRange());
       this.setLocations(split.getLocations());
+      this.setTableName(split.getTableName());
     }
-
-    protected RangeInputSplit(String table, Range range, String[] locations) {
+  
+    protected RangeInputSplit(String table, String tableId, Range range, String[] locations) {
       this.range = range;
       this.locations = locations;
+      this.tableName = table;
+      this.tableId = tableId;
     }
-
+  
     public Range getRange() {
       return range;
     }
-
+  
     public void setRange(Range range) {
       this.range = range;
     }
-
+  
+    public String getTableName() {
+      return tableName;
+    }
+  
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+  
+    public void setTableId(String tableId) {
+      this.tableId = tableId;
+    }
+  
+    public String getTableId() {
+      return tableId;
+    }
+  
     private static byte[] extractBytes(ByteSequence seq, int numBytes) {
       byte[] bytes = new byte[numBytes + 1];
       bytes[0] = 0;
@@ -905,7 +1037,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       }
       return bytes;
     }
-
+  
     public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
       int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
       BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
@@ -913,7 +1045,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
       return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
     }
-
+  
     public float getProgress(Key currentKey) {
       if (currentKey == null)
         return 0f;
@@ -932,7 +1064,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       // if we can't figure it out, then claim no progress
       return 0f;
     }
-
+  
     /**
      * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
      */
@@ -942,41 +1074,43 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
       int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
       long diff = 0;
-
+    
       byte[] start = startRow.getBytes();
       byte[] stop = stopRow.getBytes();
       for (int i = 0; i < maxCommon; ++i) {
         diff |= 0xff & (start[i] ^ stop[i]);
         diff <<= Byte.SIZE;
       }
-
+    
       if (startRow.getLength() != stopRow.getLength())
         diff |= 0xff;
-
+    
       return diff + 1;
     }
-
+  
     @Override
     public String[] getLocations() throws IOException {
       return locations;
     }
-
+  
     public void setLocations(String[] locations) {
       this.locations = locations;
     }
-
+  
     @Override
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
+      tableName = in.readUTF();
       int numLocs = in.readInt();
       locations = new String[numLocs];
       for (int i = 0; i < numLocs; ++i)
         locations[i] = in.readUTF();
     }
-
+  
     @Override
     public void write(DataOutput out) throws IOException {
       range.write(out);
+      out.writeUTF(tableName);
       out.writeInt(locations.length);
       for (int i = 0; i < locations.length; ++i)
         out.writeUTF(locations[i]);
@@ -994,5 +1128,4 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       throw new RuntimeException(e);
     }
   }
-
 }


Mime
View raw message