accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cjno...@apache.org
Subject [2/5] git commit: Adding new TableQueryConfig object for setting multiple table info in the InputFormatBase
Date Sun, 29 Sep 2013 03:28:58 GMT
Adding new TableQueryConfig object for setting multiple table info in the InputFormatBase


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d41fdb19
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d41fdb19
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d41fdb19

Branch: refs/heads/ACCUMULO-391
Commit: d41fdb190ad31ef46b964d11f3b5ca2b7401b78d
Parents: acba59b
Author: Corey J. Nolet <cjnolet@gmail.com>
Authored: Sat Sep 28 21:58:40 2013 -0400
Committer: Corey J. Nolet <cjnolet@gmail.com>
Committed: Sat Sep 28 21:58:40 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/thrift/ClientService.java  |  16 +-
 .../thrift/ThriftTableOperationException.java   |   4 +-
 .../core/client/mapred/InputFormatBase.java     |  51 ++-
 .../core/client/mapreduce/InputFormatBase.java  | 397 +++++++++++++------
 .../mapreduce/lib/util/InputConfigurator.java   | 226 ++++++-----
 .../accumulo/core/conf/TableQueryConfig.java    | 199 ++++++++++
 .../core/master/thrift/MasterClientService.java |  16 +-
 .../accumulo/core/util/ArgumentChecker.java     |   5 +
 .../client/mapred/AccumuloInputFormatTest.java  |  10 +-
 .../mapreduce/AccumuloInputFormatTest.java      | 260 +++++++++---
 .../core/conf/TableQueryConfigTest.java         |  93 +++++
 11 files changed, 965 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java b/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
index 488e065..f44d4a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
@@ -17751,7 +17751,7 @@ import org.slf4j.LoggerFactory;
             return CREDENTIALS;
           case 2: // PRINCIPAL
             return PRINCIPAL;
-          case 3: // TABLE_NAME
+          case 3: // TABLE
             return TABLE_NAME;
           case 4: // TBL_PERM
             return TBL_PERM;
@@ -18329,7 +18329,7 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 3: // TABLE_NAME
+            case 3: // TABLE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                 struct.tableName = iprot.readString();
                 struct.setTableNameIsSet(true);
@@ -21107,7 +21107,7 @@ import org.slf4j.LoggerFactory;
             return CREDENTIALS;
           case 2: // PRINCIPAL
             return PRINCIPAL;
-          case 3: // TABLE_NAME
+          case 3: // TABLE
             return TABLE_NAME;
           case 4: // PERMISSION
             return PERMISSION;
@@ -21685,7 +21685,7 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 3: // TABLE_NAME
+            case 3: // TABLE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                 struct.tableName = iprot.readString();
                 struct.setTableNameIsSet(true);
@@ -22327,7 +22327,7 @@ import org.slf4j.LoggerFactory;
             return CREDENTIALS;
           case 2: // PRINCIPAL
             return PRINCIPAL;
-          case 3: // TABLE_NAME
+          case 3: // TABLE
             return TABLE_NAME;
           case 4: // PERMISSION
             return PERMISSION;
@@ -22905,7 +22905,7 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 3: // TABLE_NAME
+            case 3: // TABLE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                 struct.tableName = iprot.readString();
                 struct.setTableNameIsSet(true);
@@ -24536,7 +24536,7 @@ import org.slf4j.LoggerFactory;
             return TINFO;
           case 3: // CREDENTIALS
             return CREDENTIALS;
-          case 2: // TABLE_NAME
+          case 2: // TABLE
             return TABLE_NAME;
           default:
             return null;
@@ -24960,7 +24960,7 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 2: // TABLE_NAME
+            case 2: // TABLE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                 struct.tableName = iprot.readString();
                 struct.setTableNameIsSet(true);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ThriftTableOperationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ThriftTableOperationException.java b/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ThriftTableOperationException.java
index 3863e75..dd2e192 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ThriftTableOperationException.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ThriftTableOperationException.java
@@ -106,7 +106,7 @@ import org.slf4j.LoggerFactory;
       switch(fieldId) {
         case 1: // TABLE_ID
           return TABLE_ID;
-        case 2: // TABLE_NAME
+        case 2: // TABLE
           return TABLE_NAME;
         case 3: // OP
           return OP;
@@ -680,7 +680,7 @@ import org.slf4j.LoggerFactory;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
-          case 2: // TABLE_NAME
+          case 2: // TABLE
             if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
               struct.tableName = iprot.readString();
               struct.setTableNameIsSet(true);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
index c796cd2..ea6bfbc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -132,7 +133,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
    */
   protected static Boolean isConnectorInfoSet(JobConf job) {
-    return InputConfigurator.isConnectorInfoSet(CLASS, job);
+    return InputConfigurator.isConnectorInfoSet(CLASS,job);
   }
 
   /**
@@ -275,7 +276,11 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @see #setInputTableName(JobConf, String)
    */
   protected static String getInputTableName(JobConf job) {
-    return InputConfigurator.getInputTableName(CLASS, job);
+    String[] tableNames = InputConfigurator.getInputTableNames(CLASS, job);
+    if (tableNames.length > 0)
+      return tableNames[0];
+    else
+      return null;
   }
 
   /**
@@ -329,7 +334,12 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @see #setRanges(JobConf, Collection)
    */
   protected static List<Range> getRanges(JobConf job) throws IOException {
-    return InputConfigurator.getRanges(CLASS, job);
+    Map<String, List<Range>> tableRanges = InputConfigurator.getRanges(CLASS,job);
+    List<Range> ranges = tableRanges.get(getInputTableName(job));
+    if(ranges != null)
+      return ranges;
+    else
+      return new LinkedList<Range>();
   }
 
   /**
@@ -356,7 +366,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @see #fetchColumns(JobConf, Collection)
    */
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobConf job) {
-    return InputConfigurator.getFetchedColumns(CLASS, job);
+    return InputConfigurator.getFetchedColumns(CLASS, job, getInputTableName(job));
   }
 
   /**
@@ -382,7 +392,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @see #addIterator(JobConf, IteratorSetting)
    */
   protected static List<IteratorSetting> getIterators(JobConf job) {
-    return InputConfigurator.getIterators(CLASS, job);
+    return InputConfigurator.getDefaultIterators(CLASS,job);
   }
 
   /**
@@ -533,8 +543,8 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobConf job) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, job);
+  protected static TabletLocator getTabletLocator(JobConf job, String tableName) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, job, tableName);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -760,9 +770,12 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     log.setLevel(getLogLevel(job));
     validateOptions(job);
 
-    String tableName = getInputTableName(job);
     boolean autoAdjust = getAutoAdjustRanges(job);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job);
+    List<Range> tablesRanges = getRanges(job);
+    LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
+
+    String tableName = getInputTableName(job);
+    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tablesRanges) : tablesRanges;
 
     if (ranges.isEmpty()) {
       ranges = new ArrayList<Range>(1);
@@ -772,6 +785,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     // get the metadata information for these ranges
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
+    String tableId = null;
     try {
       if (isOfflineScan(job)) {
         binnedRanges = binOfflineTable(job, tableName, ranges);
@@ -782,13 +796,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
         }
       } else {
         Instance instance = getInstance(job);
-        String tableId = null;
-        tl = getTabletLocator(job);
+        tl = getTabletLocator(job, tableName);
         // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
         tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(job), getAuthenticationToken(job)), ranges, binnedRanges).isEmpty()) {
+        Credentials creds = new Credentials(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job)));
+
+        while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
           if (!(instance instanceof MockInstance)) {
-            if (tableId == null)
               tableId = Tables.getTableId(instance, tableName);
             if (!Tables.exists(instance, tableId))
               throw new TableDeletedException(tableId);
@@ -804,8 +818,6 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     } catch (Exception e) {
       throw new IOException(e);
     }
-
-    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
     HashMap<Range,ArrayList<String>> splitsToAdd = null;
 
     if (!autoAdjust)
@@ -827,7 +839,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
         for (Range r : extentRanges.getValue()) {
           if (autoAdjust) {
             // divide ranges into smaller ranges, based on the tablets
-            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
+            splits.add(new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}));
           } else {
             // don't divide ranges
             ArrayList<String> locations = splitsToAdd.get(r);
@@ -842,7 +854,8 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
 
     if (!autoAdjust)
       for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
-        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
+        splits.add(new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])));
+
     return splits.toArray(new InputSplit[splits.size()]);
   }
 
@@ -859,8 +872,8 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       super(split);
     }
 
-    protected RangeInputSplit(String table, Range range, String[] locations) {
-      super(table, range, locations);
+    protected RangeInputSplit(String table, String tableId, Range range, String[] locations) {
+      super(table, tableId,  range, locations);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 13f9708..04ff718 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -16,6 +16,9 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -26,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -50,7 +54,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -189,7 +193,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setConnectorInfo(Job, String, String)
    */
   protected static AuthenticationToken getAuthenticationToken(JobContext context) {
-    return InputConfigurator.getAuthenticationToken(CLASS, getConfiguration(context));
+    return InputConfigurator.getAuthenticationToken(CLASS,getConfiguration(context));
   }
 
   /**
@@ -231,7 +235,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setMockInstance(Job, String)
    */
   protected static Instance getInstance(JobContext context) {
-    return InputConfigurator.getInstance(CLASS, getConfiguration(context));
+    return InputConfigurator.getInstance(CLASS,getConfiguration(context));
   }
 
   /**
@@ -244,7 +248,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setLogLevel(Job job, Level level) {
-    InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
+    InputConfigurator.setLogLevel(CLASS,job.getConfiguration(),level);
   }
 
   /**
@@ -261,7 +265,8 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   }
 
   /**
-   * Sets the name of the input table, over which this job will scan.
+   * Sets the name of the input table, over which this job will scan. This method has been deprecated in favor of
+   * {@link InputFormatBase#setInputTableNames(org.apache.hadoop.mapreduce.Job, java.util.Collection)}
    * 
    * @param job
    *          the Hadoop job instance to be configured
@@ -269,26 +274,28 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
    */
+  @Deprecated
   public static void setInputTableName(Job job, String tableName) {
-    InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
+    InputConfigurator.setInputTableName(CLASS,job.getConfiguration(),tableName);
   }
 
   /**
-   * Gets the table name from the configuration.
+   * Sets the names of the input tables over which this job will scan.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the table name
-   * @since 1.5.0
-   * @see #setInputTableName(Job, String)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableNames
+   *          the table to use when the tablename is null in the write call
+   * @since 1.6.0
    */
-  protected static String getInputTableName(JobContext context) {
-    return InputConfigurator.getInputTableName(CLASS, getConfiguration(context));
+  @Deprecated
+  public static void setInputTableNames(Job job, Collection<String> tableNames) {
+    InputConfigurator.setInputTableNames(CLASS,job.getConfiguration(),tableNames);
   }
 
   /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
-   * 
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param auths
@@ -296,12 +303,12 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setScanAuthorizations(Job job, Authorizations auths) {
-    InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
+    InputConfigurator.setScanAuthorizations(CLASS,job.getConfiguration(),auths);
   }
 
   /**
    * Gets the authorizations to set for the scans from the configuration.
-   * 
+   *
    * @param context
    *          the Hadoop context for the configured job
    * @return the Accumulo scan authorizations
@@ -309,19 +316,35 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setScanAuthorizations(Job, Authorizations)
    */
   protected static Authorizations getScanAuthorizations(JobContext context) {
-    return InputConfigurator.getScanAuthorizations(CLASS, getConfiguration(context));
+    return InputConfigurator.getScanAuthorizations(CLASS,getConfiguration(context));
   }
 
   /**
-   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
-   * 
+   * Sets the input ranges to scan for all tables associated with this job. This will be added to any per-table ranges that have been set using
+   * {@link #setRanges(org.apache.hadoop.mapreduce.Job, java.util.Map)}
+   *
    * @param job
    *          the Hadoop job instance to be configured
    * @param ranges
    *          the ranges that will be mapped over
    * @since 1.5.0
    */
+  @Deprecated
   public static void setRanges(Job job, Collection<Range> ranges) {
+    InputConfigurator.setRanges(CLASS,job.getConfiguration(),ranges);
+  }
+
+  /**
+   * Sets the input ranges to scan per-table for this job.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param ranges
+   *          the per-table ranges that will be mapped over
+   * @since 1.6.0
+   */
+  @Deprecated
+  public static void setRanges(Job job, Map<String,Collection<Range>> ranges) {
     InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
   }
 
@@ -335,13 +358,16 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *           if the ranges have been encoded improperly
    * @since 1.5.0
    * @see #setRanges(Job, Collection)
+   * @see #setRanges(org.apache.hadoop.mapreduce.Job, java.util.Map)
    */
-  protected static List<Range> getRanges(JobContext context) throws IOException {
+  @Deprecated
+  protected static Map<String,List<Range>> getRanges(JobContext context) throws IOException {
     return InputConfigurator.getRanges(CLASS, getConfiguration(context));
   }
 
   /**
-   * Restricts the columns that will be mapped over for this job.
+   * Restricts the columns that will be mapped over for this job for all tables. These columns will be added to any per-table columns set with
+   * {@link #fetchColumns(org.apache.hadoop.mapreduce.Job, java.util.Map)}.
    * 
    * @param job
    *          the Hadoop job instance to be configured
@@ -350,25 +376,44 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
    * @since 1.5.0
    */
+  @Deprecated
   public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
   }
 
   /**
-   * Gets the columns to be mapped over from this job.
+   * Restricts the columns that will be mapped over for this job per table..
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param columnFamilyColumnQualifierPairs
+   *          A map keyed by table name where the value is a pair of {@link Text} objects corresponding to column family and column qualifier. If the column
+   *          qualifier is null, the entire column family is selected. An empty set is the default and is equivalent to scanning the all columns.
+   * @since 1.6.0
+   */
+  @Deprecated
+  public static void fetchColumns(Job job, Map<String,Collection<Pair<Text,Text>>> columnFamilyColumnQualifierPairs) {
+    InputConfigurator.fetchColumns(CLASS,job.getConfiguration(),columnFamilyColumnQualifierPairs);
+  }
+
+  /**
+   * Gets the columns to be mapped over from this job. Any default columns as well as per-table columns will be returned.
    * 
    * @param context
    *          the Hadoop context for the configured job
+   * @param table
+   *          the table for which to return the columns
    * @return a set of columns
-   * @since 1.5.0
+   * @since 1.6.0
    * @see #fetchColumns(Job, Collection)
    */
-  protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
-    return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context));
+  @Deprecated
+  protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context, String table) {
+    return InputConfigurator.getFetchedColumns(CLASS,getConfiguration(context),table);
   }
 
   /**
-   * Encode an iterator on the input for this job.
+   * Encode an iterator on the default all tables for this job.
    * 
    * @param job
    *          the Hadoop job instance to be configured
@@ -376,12 +421,47 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the configuration of the iterator
    * @since 1.5.0
    */
+  @Deprecated
   public static void addIterator(Job job, IteratorSetting cfg) {
-    InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
+    InputConfigurator.addIterator(CLASS,job.getConfiguration(),cfg);
   }
 
   /**
-   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
+   * Encode an iterator on the input for this job for the specified table.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param table
+   *          the table for which to add the iterator
+   * @param cfg
+   *          the configuration of the iterator
+   * @since 1.6.0
+   */
+  @Deprecated
+  public static void addIterator(Job job, String table, IteratorSetting cfg) {
+    InputConfigurator.addIterator(CLASS, job.getConfiguration(), table, cfg);
+  }
+
+  /**
+   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration for the specific table. Any default iterators will be
+   * included in the return.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @param table
+   *          the table for which to return the iterators
+   * @return a list of iterators for the given table
+   * @since 1.6.0
+   * @see #addIterator(Job, String, IteratorSetting)
+   */
+  @Deprecated
+  protected static List<IteratorSetting> getIterators(JobContext context, String table) {
+    return InputConfigurator.getIterators(CLASS, getConfiguration(context), table);
+  }
+
+  /**
+   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. This will only return iterators that have not been set
+   * for a specific table.
    * 
    * @param context
    *          the Hadoop context for the configured job
@@ -389,8 +469,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @since 1.5.0
    * @see #addIterator(Job, IteratorSetting)
    */
+  @Deprecated
   protected static List<IteratorSetting> getIterators(JobContext context) {
-    return InputConfigurator.getIterators(CLASS, getConfiguration(context));
+    return InputConfigurator.getDefaultIterators(CLASS,getConfiguration(context));
   }
 
   /**
@@ -407,6 +488,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setRanges(Job, Collection)
    * @since 1.5.0
    */
+  @Deprecated
   public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
     InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -420,10 +502,25 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @since 1.5.0
    * @see #setAutoAdjustRanges(Job, boolean)
    */
+  @Deprecated
   protected static boolean getAutoAdjustRanges(JobContext context) {
     return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context));
   }
 
+
+  protected static void setTableQueryConfigurations(JobContext job, TableQueryConfig... configs) {
+    checkNotNull(configs);
+    InputConfigurator.setTableQueryConfiguration(CLASS, getConfiguration(job), configs);
+  }
+
+  public static List<TableQueryConfig> getTableQueryConfigurations(JobContext job) {
+    return InputConfigurator.getTableQueryConfigurations(CLASS, getConfiguration(job));
+  }
+
+  protected static TableQueryConfig getTableQueryConfiguration(JobContext job, String tableName) {
+    return InputConfigurator.getTableQueryConfiguration(CLASS, getConfiguration(job), tableName);
+  }
+
   /**
    * Controls the use of the {@link IsolatedScanner} in this job.
    * 
@@ -536,13 +633,15 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * 
    * @param context
    *          the Hadoop context for the configured job
+   * @param table
+   *          the table for which to initialize the locator
    * @return an Accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context));
+  protected static TabletLocator getTabletLocator(JobContext context, String table) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), table);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -577,42 +676,60 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     protected RangeInputSplit split;
 
     /**
-     * Apply the configured iterators from the configuration to the scanner.
+     * Apply the configured iterators from the configuration to the scanner. This applies both the
+     * default iterators and the per-table iterators.
      * 
      * @param context
      *          the Hadoop context for the configured job
      * @param scanner
      *          the scanner to configure
+     *  @param tableName
+     *          the table name for which to set up the iterators
      */
-    protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
-      List<IteratorSetting> iterators = getIterators(context);
-      for (IteratorSetting iterator : iterators) {
+    protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName) {
+      List<IteratorSetting> iterators = getIterators(context, tableName); // default iterators will be included
+      for (IteratorSetting iterator : iterators)
         scanner.addScanIterator(iterator);
-      }
     }
 
+
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
     @Override
     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
+
       Scanner scanner;
       split = (RangeInputSplit) inSplit;
-      log.debug("Initializing input split: " + split.range);
+      log.debug("Initializing input split: " + split.getRange());
       Instance instance = getInstance(attempt);
       String principal = getPrincipal(attempt);
+
+      TableQueryConfig tableConfig = getTableQueryConfiguration(attempt, split.getTableName());
+
+      // in case the table name changed, we can still use the previous name for terms of configuration,
+      // but for the scanner, we'll need to reference the new table name.
+      String actualNameForId = null;
+      try{
+        actualNameForId = Tables.getTableName(instance, split.getTableId());
+        if(!actualNameForId.equals(split.getTableName()))
+          log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
+      }catch(TableNotFoundException e){
+        throw new IOException("The specified table was not found for id=" + split.getTableId());
+      }
+
       AuthenticationToken token = getAuthenticationToken(attempt);
       Authorizations authorizations = getScanAuthorizations(attempt);
-
       try {
         log.debug("Creating connector with user: " + principal);
+
         Connector conn = instance.getConnector(principal, token);
-        log.debug("Creating scanner for table: " + getInputTableName(attempt));
+        log.debug("Creating scanner for table: " + actualNameForId);
         log.debug("Authorizations are: " + authorizations);
         if (isOfflineScan(attempt)) {
-          scanner = new OfflineScanner(instance, new Credentials(principal, token), Tables.getTableId(instance, getInputTableName(attempt)), authorizations);
+          scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations);
         } else {
-          scanner = conn.createScanner(getInputTableName(attempt), authorizations);
+          scanner = conn.createScanner(actualNameForId, authorizations);
         }
         if (isIsolated(attempt)) {
           log.info("Creating isolated scanner");
@@ -622,13 +739,13 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupIterators(attempt, scanner);
+        setupIterators(attempt, scanner, split.getTableName());
       } catch (Exception e) {
         throw new IOException(e);
       }
 
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
+      for (Pair<Text,Text> c : tableConfig.getColumns()) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -638,8 +755,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         }
       }
 
-      scanner.setRange(split.range);
-
+      scanner.setRange(split.getRange());
       numKeysRead = 0;
 
       // do this last after setting all scanner options
@@ -705,9 +821,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       scanner.setRange(metadataRange);
 
       RowIterator rowIter = new RowIterator(scanner);
-
       KeyExtent lastExtent = null;
-
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
@@ -771,96 +885,104 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   }
 
   /**
-   * Read the metadata table to get tablets and match up ranges to them.
+   * Gets the splits of the tables that have been set on the job.
+   *
+   * @param conf
+   *          the configuration of the job
+   * @return  the splits from the tables based on the ranges.
+   * @throws IOException
+   *          if a table set on the job doesn't exist or an error occurs
+   *          initializing the tablet locator
    */
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    log.setLevel(getLogLevel(context));
-    validateOptions(context);
-
-    String tableName = getInputTableName(context);
-    boolean autoAdjust = getAutoAdjustRanges(context);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
+  public List<InputSplit> getSplits(JobContext conf) throws IOException {
+    log.setLevel(getLogLevel(conf));
+    validateOptions(conf);
+
+    LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
+    List<TableQueryConfig> tableConfigs = getTableQueryConfigurations(conf);
+    for (TableQueryConfig tableConfig : tableConfigs) {
+
+      boolean autoAdjust = getAutoAdjustRanges(conf); // TODO: Put this in the table config object
+      String tableId = null;
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<Range>(1);
+        ranges.add(new Range());
+      }
 
-    if (ranges.isEmpty()) {
-      ranges = new ArrayList<Range>(1);
-      ranges.add(new Range());
-    }
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      TabletLocator tl;
+      try {
+        if (isOfflineScan(conf)) {
+          binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges);
 
-    // get the metadata information for these ranges
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl;
-    try {
-      if (isOfflineScan(context)) {
-        binnedRanges = binOfflineTable(context, tableName, ranges);
-        while (binnedRanges == null) {
-          // Some tablets were still online, try again
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(context, tableName, ranges);
-        }
-      } else {
-        Instance instance = getInstance(context);
-        String tableId = null;
-        tl = getTabletLocator(context);
-        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
-        tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(context), getAuthenticationToken(context)), ranges, binnedRanges).isEmpty()) {
-          if (!(instance instanceof MockInstance)) {
-            if (tableId == null)
-              tableId = Tables.getTableId(instance, tableName);
-            if (!Tables.exists(instance, tableId))
-              throw new TableDeletedException(tableId);
-            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-              throw new TableOfflineException(instance, tableId);
           }
-          binnedRanges.clear();
-          log.warn("Unable to locate bins for specified ranges. Retrying.");
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+        } else {
+          Instance instance = getInstance(conf);
+          tl = getTabletLocator(conf, tableConfig.getTableName());
+          // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
           tl.invalidateCache();
+          Credentials creds = new Credentials(getPrincipal(conf), getAuthenticationToken(conf));
+
+          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+            if (!(instance instanceof MockInstance)) {
+              if (!Tables.exists(instance, tableId))
+                throw new TableDeletedException(tableId);
+              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, tableId);
+              tableId = Tables.getTableId(instance, tableConfig.getTableName());
+            }
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            tl.invalidateCache();
+          }
         }
+      } catch (Exception e) {
+        throw new IOException(e);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
-    HashMap<Range,ArrayList<String>> splitsToAdd = null;
 
-    if (!autoAdjust)
-      splitsToAdd = new HashMap<Range,ArrayList<String>>();
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
 
-    HashMap<String,String> hostNameCache = new HashMap<String,String>();
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<Range,ArrayList<String>>();
 
-    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-      String ip = tserverBin.getKey().split(":", 2)[0];
-      String location = hostNameCache.get(ip);
-      if (location == null) {
-        InetAddress inetAddress = InetAddress.getByName(ip);
-        location = inetAddress.getHostName();
-        hostNameCache.put(ip, location);
-      }
-
-      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-        Range ke = extentRanges.getKey().toDataRange();
-        for (Range r : extentRanges.getValue()) {
-          if (autoAdjust) {
-            // divide ranges into smaller ranges, based on the tablets
-            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
-          } else {
-            // don't divide ranges
-            ArrayList<String> locations = splitsToAdd.get(r);
-            if (locations == null)
-              locations = new ArrayList<String>(1);
-            locations.add(location);
-            splitsToAdd.put(r, locations);
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+      for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+        for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          for (Range r : extentRanges.getValue()) {
+            if (autoAdjust) {
+              // divide ranges into smaller ranges, based on the tablets
+              splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, ke.clip(r), new String[] {location}));
+            } else {
+              // don't divide ranges
+              ArrayList<String> locations = splitsToAdd.get(r);
+              if (locations == null)
+                locations = new ArrayList<String>(1);
+              locations.add(location);
+              splitsToAdd.put(r, locations);
+            }
           }
         }
       }
-    }
 
-    if (!autoAdjust)
-      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
-        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
+      if (!autoAdjust)
+        for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
+          splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, entry.getKey(), entry.getValue().toArray(new String[0])));
+    }
     return splits;
   }
 
@@ -870,20 +992,27 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static class RangeInputSplit extends InputSplit implements Writable {
     private Range range;
     private String[] locations;
+    private String tableId;
+    private String tableName;
 
     public RangeInputSplit() {
       range = new Range();
       locations = new String[0];
+      tableId = "";
+      tableName = "";
     }
 
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       this.setRange(split.getRange());
       this.setLocations(split.getLocations());
+      this.setTableName(split.getTableName());
     }
 
-    protected RangeInputSplit(String table, Range range, String[] locations) {
+    protected RangeInputSplit(String table, String tableId, Range range, String[] locations) {
       this.range = range;
       this.locations = locations;
+      this.tableName = table;
+      this.tableId = tableId;
     }
 
     public Range getRange() {
@@ -894,6 +1023,22 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       this.range = range;
     }
 
+    public String getTableName() {
+      return tableName;
+    }
+
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+
+    public void setTableId(String tableId) {
+      this.tableId = tableId;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
     private static byte[] extractBytes(ByteSequence seq, int numBytes) {
       byte[] bytes = new byte[numBytes + 1];
       bytes[0] = 0;
@@ -968,6 +1113,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     @Override
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
+      tableName = in.readUTF();
       int numLocs = in.readInt();
       locations = new String[numLocs];
       for (int i = 0; i < numLocs; ++i)
@@ -977,6 +1123,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     @Override
     public void write(DataOutput out) throws IOException {
       range.write(out);
+      out.writeUTF(tableName);
       out.writeInt(locations.length);
       for (int i = 0; i < locations.length; ++i)
         out.writeUTF(locations[i]);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index a1c3f70..29795af 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.client.mapreduce.lib.util;
 
+import static org.apache.accumulo.core.util.ArgumentChecker.notNull;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -23,10 +25,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.StringTokenizer;
+import java.util.TreeMap;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -42,11 +42,11 @@ import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mock.MockTabletLocator;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.commons.codec.binary.Base64;
@@ -65,7 +65,7 @@ public class InputConfigurator extends ConfiguratorBase {
    * @since 1.5.0
    */
   public static enum ScanOpts {
-    TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS
+    TABLE, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS
   }
 
   /**
@@ -87,25 +87,27 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param tableName
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setInputTableName(Class<?> implementingClass, Configuration conf, String tableName) {
-    ArgumentChecker.notNull(tableName);
-    conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName);
+    notNull(tableName);
+    conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE), tableName);
   }
 
   /**
-   * Gets the table name from the configuration.
-   * 
+   * Sets the name of the input table, over which this job will scan.
+   *
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
    * @param conf
    *          the Hadoop configuration object to configure
-   * @return the table name
    * @since 1.5.0
-   * @see #setInputTableName(Class, Configuration, String)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static String getInputTableName(Class<?> implementingClass, Configuration conf) {
-    return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
+    return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE));
   }
 
   /**
@@ -141,7 +143,7 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   /**
-   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   * Sets the input ranges to scan on all input tables for this job. If not set, the entire table will be scanned.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -149,21 +151,26 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the Hadoop configuration object to configure
    * @param ranges
    *          the ranges that will be mapped over
+   * @throws IllegalArgumentException
+   *          if the ranges cannot be encoded into base 64
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setRanges(Class<?> implementingClass, Configuration conf, Collection<Range> ranges) {
-    ArgumentChecker.notNull(ranges);
+    notNull(ranges);
+
     ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
     try {
       for (Range r : ranges) {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         r.write(new DataOutputStream(baos));
-        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()), Constants.UTF8));
+        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
       }
+      conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
     } catch (IOException ex) {
       throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
     }
-    conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
   }
 
   /**
@@ -176,13 +183,17 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return the ranges
    * @throws IOException
    *           if the ranges have been encoded improperly
-   * @since 1.5.0
+   * @since 1.6.0
+   * @deprecated since 1.6.0
    * @see #setRanges(Class, Configuration, Collection)
    */
+  @Deprecated
   public static List<Range> getRanges(Class<?> implementingClass, Configuration conf) throws IOException {
-    ArrayList<Range> ranges = new ArrayList<Range>();
-    for (String rangeString : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES))) {
-      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(Constants.UTF8)));
+
+    Collection<String> encodedRanges = conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES));
+    List<Range> ranges = new ArrayList<Range>();
+    for (String rangeString : encodedRanges) {
+      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
       Range range = new Range();
       range.readFields(new DataInputStream(bais));
       ranges.add(range);
@@ -191,7 +202,7 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   /**
-   * Restricts the columns that will be mapped over for this job.
+   * Restricts the columns that will be mapped over for this job. This applies the columns to all tables that have been set on the job.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -200,12 +211,17 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param columnFamilyColumnQualifierPairs
    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
+   * @throws IllegalArgumentException
+   *          if the column family is null
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void fetchColumns(Class<?> implementingClass, Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
-    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
+    notNull(columnFamilyColumnQualifierPairs);
+    ArrayList<String> columnStrings = new ArrayList<String>();
     for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+
       if (column.getFirst() == null)
         throw new IllegalArgumentException("Column family can not be null");
 
@@ -218,29 +234,7 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   /**
-   * Gets the columns to be mapped over from this job.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return a set of columns
-   * @since 1.5.0
-   * @see #fetchColumns(Class, Configuration, Collection)
-   */
-  public static Set<Pair<Text,Text>> getFetchedColumns(Class<?> implementingClass, Configuration conf) {
-    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
-    for (String col : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS))) {
-      int idx = col.indexOf(":");
-      Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(Constants.UTF8)) : Base64.decodeBase64(col.substring(0, idx).getBytes(Constants.UTF8)));
-      Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
-      columns.add(new Pair<Text,Text>(cf, cq));
-    }
-    return columns;
-  }
-
-  /**
-   * Encode an iterator on the input for this job.
+   * Encode an iterator on the input for all tables associated with this job.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -248,8 +242,12 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the Hadoop configuration object to configure
    * @param cfg
    *          the configuration of the iterator
+   * @throws IllegalArgumentException
+   *          if the iterator can't be serialized into the configuration
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void addIterator(Class<?> implementingClass, Configuration conf, IteratorSetting cfg) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     String newIter;
@@ -261,7 +259,8 @@ public class InputConfigurator extends ConfiguratorBase {
       throw new IllegalArgumentException("unable to serialize IteratorSetting");
     }
 
-    String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
+    String confKey = enumToConfKey(implementingClass, ScanOpts.ITERATORS);
+    String iterators = conf.get(confKey);
     // No iterators specified yet, create a new string
     if (iterators == null || iterators.isEmpty()) {
       iterators = newIter;
@@ -270,41 +269,7 @@ public class InputConfigurator extends ConfiguratorBase {
       iterators = iterators.concat(StringUtils.COMMA_STR + newIter);
     }
     // Store the iterators w/ the job
-    conf.set(enumToConfKey(implementingClass, ScanOpts.ITERATORS), iterators);
-  }
-
-  /**
-   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return a list of iterators
-   * @since 1.5.0
-   * @see #addIterator(Class, Configuration, IteratorSetting)
-   */
-  public static List<IteratorSetting> getIterators(Class<?> implementingClass, Configuration conf) {
-    String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
-
-    // If no iterators are present, return an empty list
-    if (iterators == null || iterators.isEmpty())
-      return new ArrayList<IteratorSetting>();
-
-    // Compose the set of iterators encoded in the job configuration
-    StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR);
-    List<IteratorSetting> list = new ArrayList<IteratorSetting>();
-    try {
-      while (tokens.hasMoreTokens()) {
-        String itstring = tokens.nextToken();
-        ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
-        list.add(new IteratorSetting(new DataInputStream(bais)));
-        bais.close();
-      }
-    } catch (IOException e) {
-      throw new IllegalArgumentException("couldn't decode iterator settings");
-    }
-    return list;
+    conf.set(confKey, iterators);
   }
 
   /**
@@ -322,7 +287,9 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the feature is enabled if true, disabled otherwise
    * @see #setRanges(Class, Configuration, Collection)
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setAutoAdjustRanges(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
     conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature);
   }
@@ -337,7 +304,9 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return false if the feature is disabled, true otherwise
    * @since 1.5.0
    * @see #setAutoAdjustRanges(Class, Configuration, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static Boolean getAutoAdjustRanges(Class<?> implementingClass, Configuration conf) {
     return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true);
   }
@@ -355,7 +324,9 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setScanIsolation(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
     conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature);
   }
@@ -370,7 +341,9 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setScanIsolation(Class, Configuration, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static Boolean isIsolated(Class<?> implementingClass, Configuration conf) {
     return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false);
   }
@@ -389,7 +362,9 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setLocalIterators(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
     conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature);
   }
@@ -404,7 +379,9 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setLocalIterators(Class, Configuration, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static Boolean usesLocalIterators(Class<?> implementingClass, Configuration conf) {
     return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false);
   }
@@ -461,6 +438,59 @@ public class InputConfigurator extends ConfiguratorBase {
     return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false);
   }
 
+  public static void setTableQueryConfiguration(Class<?> implementingClass, Configuration conf, TableQueryConfig... tconf) {
+    List<String> tableQueryConfigStrings = new ArrayList<String>();
+    for(TableQueryConfig queryConfig : tconf) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      try {
+        queryConfig.write(new DataOutputStream(baos));
+      } catch(IOException e) {
+        throw new IllegalStateException("Configuration for " + queryConfig.getTableName() + " could not be serialized.");
+      }
+      tableQueryConfigStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
+    }
+    String confKey = enumToConfKey(implementingClass, ScanOpts.TABLE);
+    conf.setStrings(confKey, tableQueryConfigStrings.toArray(new String[0]));
+  }
+
+  public static List<TableQueryConfig> getTableQueryConfigurations(Class<?> implementingClass, Configuration conf) {
+    List<TableQueryConfig> configs = new ArrayList<TableQueryConfig>();
+    Collection<String> configStrings = conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.TABLE));
+    if(configStrings == null) {
+      for(String str : configStrings) {
+        try{
+          byte[] bytes = Base64.decodeBase64(str.getBytes());
+          ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+          configs.add(new TableQueryConfig(new DataInputStream(bais)));
+          bais.close();
+        } catch(IOException e) {
+          throw new IllegalStateException("The table query configurations could not be deserialized from the given configuration");
+        }
+      }
+    }
+    return configs;
+  }
+
+  public static TableQueryConfig getTableQueryConfiguration(Class<?> implementingClass, Configuration conf, String tableName) {
+    Collection<String> configStrings = conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.TABLE));
+    if(configStrings == null) {
+      for(String str : configStrings) {
+        if(str.equals(tableName)) {
+          try{
+            byte[] bytes = Base64.decodeBase64(str.getBytes());
+            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            TableQueryConfig config = new TableQueryConfig(new DataInputStream(bais));
+            bais.close();
+            return config;
+          } catch(IOException e) {
+            throw new IllegalStateException("The table query configurations could not be deserialized from the given configuration");
+          }
+        }
+      }
+    }
+    return null;
+  }
+
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
    * 
@@ -468,17 +498,18 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the class whose name will be used as a prefix for the property configuration key
    * @param conf
    *          the Hadoop configuration object to configure
+   * @param tableName
+   *          The table name for which to initialize the {@link TabletLocator}
    * @return an Accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
-   * @since 1.5.0
+   * @since 1.6.0
    */
-  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf) throws TableNotFoundException {
+  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf, String tableName) throws TableNotFoundException {
     String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
     if ("MockInstance".equals(instanceType))
       return new MockTabletLocator();
     Instance instance = getInstance(implementingClass, conf);
-    String tableName = getInputTableName(implementingClass, conf);
     return TabletLocator.getLocator(instance, new Text(Tables.getTableId(instance, tableName)));
   }
 
@@ -507,17 +538,26 @@ public class InputConfigurator extends ConfiguratorBase {
       Connector c = getInstance(implementingClass, conf).getConnector(principal, token);
       if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
-      if (!c.securityOperations().hasTablePermission(principal, getInputTableName(implementingClass, conf), TablePermission.READ))
-        throw new IOException("Unable to access table");
-
-      if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) {
-        // validate that any scan-time iterators can be loaded by the the tablet servers
-        for (IteratorSetting iter : getIterators(implementingClass, conf)) {
-          if (!c.tableOperations().testClassLoad(getInputTableName(implementingClass, conf), iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
-            throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+
+      for (TableQueryConfig tableConfig : getTableQueryConfigurations(implementingClass, conf)) {
+        if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), tableConfig.getTableName(), TablePermission.READ))
+          throw new IOException("Unable to access table");
+      }
+
+      for (TableQueryConfig tableConfig : getTableQueryConfigurations(implementingClass,conf)) {
+        if(!tableConfig.shouldUseLocalIterators()) {
+//        if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) {
+          // validate that any scan-time iterators can be loaded by the the tablet servers
+          for (IteratorSetting iter : tableConfig.getIterators()) { // TODO: These iterators need to be separated by table
+            if (!c.tableOperations().testClassLoad(tableConfig.getTableName(), iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
+              throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+
+          }
         }
       }
 
+      // TODO: Check for the "default table case"
+
     } catch (AccumuloException e) {
       throw new IOException(e);
     } catch (AccumuloSecurityException e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/main/java/org/apache/accumulo/core/conf/TableQueryConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/TableQueryConfig.java b/core/src/main/java/org/apache/accumulo/core/conf/TableQueryConfig.java
new file mode 100644
index 0000000..00f4dc0
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/conf/TableQueryConfig.java
@@ -0,0 +1,199 @@
+package org.apache.accumulo.core.conf;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class TableQueryConfig implements Writable {
+  
+  private String tableName;
+  private List<IteratorSetting> iterators;
+  private List<Range> ranges;
+  private Set<Pair<Text,Text>> columns;
+  
+  private boolean autoAdjustRanges = true;
+  private boolean useLocalIterators = false;
+  private boolean useIsolatedScanners = false;
+  
+  public TableQueryConfig(String tableName) {
+    checkNotNull(tableName);
+    this.tableName = tableName;
+  }
+
+  public TableQueryConfig(DataInput input) throws IOException{
+    readFields(input);
+  }
+
+  public TableQueryConfig setRanges(List<Range> ranges) {
+    this.ranges = ranges;
+    return this;
+  }
+
+  public TableQueryConfig setColumns(Set<Pair<Text,Text>> columns) {
+    this.columns = columns;
+    return this;
+  }
+
+  public TableQueryConfig setIterators(List<IteratorSetting> iterators) {
+    this.iterators = iterators;
+    return this;
+  }
+
+  public TableQueryConfig setAutoAdjustRanges(boolean autoAdjustRanges){
+    this.autoAdjustRanges=autoAdjustRanges;
+    return this;
+  }
+
+  public TableQueryConfig setUseLocalIterators(boolean useLocalIterators){
+    this.useLocalIterators=useLocalIterators;
+    return this;
+  }
+
+  public TableQueryConfig setUseIsolatedScanners(boolean useIsolatedScanners){
+    this.useIsolatedScanners=useIsolatedScanners;
+    return this;
+  }
+
+  public String getTableName(){
+    return tableName;
+  }
+
+  public List<IteratorSetting> getIterators(){
+    return iterators;
+  }
+
+  public List<Range> getRanges(){
+    return ranges;
+  }
+
+  public Set<Pair<Text,Text>> getColumns(){
+    return columns;
+  }
+
+  public boolean shouldAutoAdjustRanges(){
+    return autoAdjustRanges;
+  }
+
+  public boolean shouldUseLocalIterators(){
+    return useLocalIterators;
+  }
+
+  public boolean shouldUseIsolatedScanners(){
+    return useIsolatedScanners;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeUTF(tableName);
+    if (iterators != null) {
+      dataOutput.writeInt(iterators.size());
+      for (IteratorSetting setting : iterators)
+        setting.write(dataOutput);
+    } else {
+      dataOutput.writeInt(0);
+    }
+    if (ranges != null) {
+      dataOutput.writeInt(ranges.size());
+      for (Range range : ranges)
+        range.write(dataOutput);
+    } else {
+      dataOutput.writeInt(0);
+    }
+    if (columns != null) {
+      dataOutput.writeInt(columns.size());
+      for (Pair<Text,Text> column : columns) {
+        if (column.getSecond() == null) {
+          dataOutput.writeInt(1);
+          column.getFirst().write(dataOutput);
+        } else {
+          dataOutput.writeInt(2);
+          column.getFirst().write(dataOutput);
+          column.getSecond().write(dataOutput);
+        }
+      }
+    } else {
+      dataOutput.writeInt(0);
+    }
+    dataOutput.writeBoolean(autoAdjustRanges);
+    dataOutput.writeBoolean(useLocalIterators);
+    dataOutput.writeBoolean(useIsolatedScanners);
+  }
+  
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    this.tableName = dataInput.readUTF();
+    // load iterators
+    long iterSize = dataInput.readInt();
+    if (iterSize > 0)
+      iterators = new ArrayList<IteratorSetting>();
+    for (int i = 0; i < iterSize; i++)
+      iterators.add(new IteratorSetting(dataInput));
+    // load ranges
+    long rangeSize = dataInput.readInt();
+    if (rangeSize > 0)
+      ranges = new ArrayList<Range>();
+    for (int i = 0; i < rangeSize; i++) {
+      Range range = new Range();
+      range.readFields(dataInput);
+      ranges.add(range);
+    }
+    // load columns
+    long columnSize = dataInput.readInt();
+    if (columnSize > 0)
+      columns = new HashSet<Pair<Text,Text>>();
+    for (int i = 0; i < columnSize; i++) {
+      long numPairs = dataInput.readInt();
+      Text colFam = new Text();
+      colFam.readFields(dataInput);
+      if (numPairs == 1) {
+        columns.add(new Pair<Text,Text>(colFam, null));
+      } else if (numPairs == 2) {
+        Text colQual = new Text();
+        colQual.readFields(dataInput);
+        columns.add(new Pair<Text,Text>(colFam, colQual));
+      }
+    }
+    autoAdjustRanges = dataInput.readBoolean();
+    useLocalIterators = dataInput.readBoolean();
+    useIsolatedScanners = dataInput.readBoolean();
+  }
+
+  @Override
+  public boolean equals(Object o){
+    if(this==o) return true;
+    if(o==null||getClass()!=o.getClass()) return false;
+
+    TableQueryConfig that=(TableQueryConfig)o;
+
+    if(autoAdjustRanges!=that.autoAdjustRanges) return false;
+    if(columns!=null?!columns.equals(that.columns):that.columns!=null) return false;
+    if(iterators!=null?!iterators.equals(that.iterators):that.iterators!=null) return false;
+    if(ranges!=null?!ranges.equals(that.ranges):that.ranges!=null) return false;
+    if(tableName!=null?!tableName.equals(that.tableName):that.tableName!=null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode(){
+    int result=tableName!=null?tableName.hashCode():0;
+    result=31*result+(iterators!=null?iterators.hashCode():0);
+    result=31*result+(ranges!=null?ranges.hashCode():0);
+    result=31*result+(columns!=null?columns.hashCode():0);
+    result=31*result+(autoAdjustRanges?1:0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java b/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
index 5b9949a..3a00518 100644
--- a/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
+++ b/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
@@ -1690,7 +1690,7 @@ import org.slf4j.LoggerFactory;
             return TINFO;
           case 1: // CREDENTIALS
             return CREDENTIALS;
-          case 2: // TABLE_NAME
+          case 2: // TABLE
             return TABLE_NAME;
           default:
             return null;
@@ -2114,7 +2114,7 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 2: // TABLE_NAME
+            case 2: // TABLE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                 struct.tableName = iprot.readString();
                 struct.setTableNameIsSet(true);
@@ -2824,7 +2824,7 @@ import org.slf4j.LoggerFactory;
             return TINFO;
           case 1: // CREDENTIALS
             return CREDENTIALS;
-          case 2: // TABLE_NAME
+          case 2: // TABLE
             return TABLE_NAME;
           case 6: // START_ROW
             return START_ROW;
@@ -3562,7 +3562,7 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 2: // TABLE_NAME
+            case 2: // TABLE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                 struct.tableName = iprot.readString();
                 struct.setTableNameIsSet(true);
@@ -4254,7 +4254,7 @@ import org.slf4j.LoggerFactory;
             return TINFO;
           case 1: // CREDENTIALS
             return CREDENTIALS;
-          case 2: // TABLE_NAME
+          case 2: // TABLE
             return TABLE_NAME;
           case 3: // PROPERTY
             return PROPERTY;
@@ -4826,7 +4826,7 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 2: // TABLE_NAME
+            case 2: // TABLE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                 struct.tableName = iprot.readString();
                 struct.setTableNameIsSet(true);
@@ -5473,7 +5473,7 @@ import org.slf4j.LoggerFactory;
             return TINFO;
           case 1: // CREDENTIALS
             return CREDENTIALS;
-          case 2: // TABLE_NAME
+          case 2: // TABLE
             return TABLE_NAME;
           case 3: // PROPERTY
             return PROPERTY;
@@ -5971,7 +5971,7 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 2: // TABLE_NAME
+            case 2: // TABLE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                 struct.tableName = iprot.readString();
                 struct.setTableNameIsSet(true);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java b/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
index 0c8ba07..2379873 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
@@ -60,4 +60,9 @@ public class ArgumentChecker {
     if (i <= 0)
       throw new IllegalArgumentException("integer should be > 0, was " + i);
   }
+  
+  public static final void notEmpty(Iterable arg) {
+    if (!arg.iterator().hasNext())
+      throw new IllegalArgumentException("Argument should not be empty");
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java
index 4f527e1..96113b1 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.core.client.mapred;
 
+import static org.apache.commons.codec.binary.Base64.encodeBase64;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -64,19 +65,18 @@ public class AccumuloInputFormatTest {
   @Test
   public void testSetIterator() throws IOException {
     JobConf job = new JobConf();
-    
     IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator");
     AccumuloInputFormat.addIterator(job, is);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     is.write(new DataOutputStream(baos));
     String iterators = job.get("AccumuloInputFormat.ScanOpts.Iterators");
-    assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), iterators);
+    assertEquals(new String(encodeBase64 (baos.toByteArray ())), iterators);
   }
   
   @Test
   public void testAddIterator() throws IOException {
     JobConf job = new JobConf();
-    
+
     AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
     AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
     IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
@@ -152,7 +152,7 @@ public class AccumuloInputFormatTest {
   @Test
   public void testGetIteratorSettings() throws IOException {
     JobConf job = new JobConf();
-    
+
     AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
     AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
     AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
@@ -185,7 +185,7 @@ public class AccumuloInputFormatTest {
     JobConf job = new JobConf();
     
     String regex = ">\"*%<>\'\\";
-    
+
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, regex, null, null, null, false);
     AccumuloInputFormat.addIterator(job, is);


Mime
View raw message