accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1437726 [3/10] - in /accumulo/branches/ACCUMULO-259: ./ assemble/ conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/ conf/examples/2GB/native-standalone/ conf/examples/2GB/standalone/ conf/examples/3GB/native-standalone/ co...
Date Wed, 23 Jan 2013 20:52:04 GMT
Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Wed Jan 23 20:51:59 2013
@@ -16,20 +16,17 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.math.BigInteger;
 import java.net.InetAddress;
-import java.nio.ByteBuffer;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -54,31 +51,27 @@ import org.apache.accumulo.core.client.Z
 import org.apache.accumulo.core.client.impl.OfflineScanner;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.mapreduce.util.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.mock.MockTabletLocator;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.security.tokens.InstanceTokenWrapper;
+import org.apache.accumulo.core.security.tokens.AccumuloToken;
 import org.apache.accumulo.core.security.tokens.UserPassToken;
-import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -86,553 +79,438 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This input format provides keys and values of type K and V to the Map() and Reduce()
- * functions.
- * 
- * Subclasses must implement the following method: public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws
- * IOException, InterruptedException
- * 
- * This class includes a static class that can be used to create a RecordReader: protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V>
- * 
- * Subclasses of RecordReaderBase must implement the following method: public boolean nextKeyValue() throws IOException, InterruptedException This method should
- * set the following variables: K currentK V currentV Key currentKey (used for progress reporting) int numKeysRead (used for progress reporting)
- * 
- * See AccumuloInputFormat for an example implementation.
- * 
- * Other static methods are optional
+ * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
+ * <p>
+ * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V.
+ * <p>
+ * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its
+ * RecordReaderBase.nextKeyValue() to transform them to the desired generic types K,V.
+ * <p>
+ * See {@link AccumuloInputFormat} for an example implementation.
  */
-
 public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
-  protected static final Logger log = Logger.getLogger(InputFormatBase.class);
-  
-  private static final String PREFIX = AccumuloInputFormat.class.getSimpleName();
-  private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
-  private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD = PREFIX + ".password";
-  private static final String TABLE_NAME = PREFIX + ".tablename";
-  private static final String AUTHORIZATIONS = PREFIX + ".authorizations";
-  
-  private static final String INSTANCE_NAME = PREFIX + ".instanceName";
-  private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
-  private static final String MOCK = ".useMockInstance";
-  
-  private static final String RANGES = PREFIX + ".ranges";
-  private static final String AUTO_ADJUST_RANGES = PREFIX + ".ranges.autoAdjust";
-  
-  private static final String COLUMNS = PREFIX + ".columns";
-  private static final String LOGLEVEL = PREFIX + ".loglevel";
-  
-  private static final String ISOLATED = PREFIX + ".isolated";
-  
-  private static final String LOCAL_ITERATORS = PREFIX + ".localiters";
   
-  // Used to specify the maximum # of versions of an Accumulo cell value to return
-  private static final String MAX_VERSIONS = PREFIX + ".maxVersions";
-  
-  // Used for specifying the iterators to be applied
-  private static final String ITERATORS = PREFIX + ".iterators";
-  private static final String ITERATORS_DELIM = ",";
-  
-  private static final String READ_OFFLINE = PREFIX + ".read.offline";
+  private static final Class<?> CLASS = AccumuloInputFormat.class;
+  protected static final Logger log = Logger.getLogger(CLASS);
   
   /**
-   * Enable or disable use of the {@link IsolatedScanner} in this configuration object. By default it is not enabled.
+   * Sets the connector information needed to communicate with Accumulo in this job.
    * 
-   * @param conf
-   *          The Hadoop configuration object
-   * @param enable
-   *          if true, enable usage of the IsolatedScanner. Otherwise, disable.
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param token
+   *          a valid AccumuloToken (principal must have Table.CREATE permission)
+   * @since 1.5.0
    */
-  public static void setIsolated(Configuration conf, boolean enable) {
-    conf.setBoolean(ISOLATED, enable);
+  public static void setConnectorInfo(Job job, AccumuloToken<?,?> token) {
+    InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), token);
   }
   
   /**
-   * Enable or disable use of the {@link ClientSideIteratorScanner} in this configuration object. By default it is not enabled.
+   * Determines if the connector has been configured.
    * 
-   * @param conf
-   *          The Hadoop configuration object
-   * @param enable
-   *          if true, enable usage of the ClientSideInteratorScanner. Otherwise, disable.
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the connector has been configured, false otherwise
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  public static void setLocalIterators(Configuration conf, boolean enable) {
-    conf.setBoolean(LOCAL_ITERATORS, enable);
+  protected static Boolean isConnectorInfoSet(JobContext context) {
+    return InputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration());
   }
   
   /**
-   * Initialize the user, table, and authorization information for the configuration object that will be used with an Accumulo InputFormat.
+   * Gets the user name from the configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @param user
-   *          a valid accumulo user
-   * @param passwd
-   *          the user's password
-   * @param table
-   *          the table to read
-   * @param auths
-   *          the authorizations used to restrict data read
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the user name
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
-    if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Input info can only be set once per job");
-    conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(user, passwd, table);
-    conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    conf.set(TABLE_NAME, table);
-    if (auths != null && !auths.isEmpty())
-      conf.set(AUTHORIZATIONS, auths.serialize());
+  protected static AccumuloToken<?,?> getToken(JobContext context) {
+    return InputConfigurator.getToken(CLASS, context.getConfiguration());
   }
-  
+
   /**
-   * Configure a {@link ZooKeeperInstance} for this configuration object.
+   * Configures a {@link ZooKeeperInstance} for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param instanceName
-   *          the accumulo instance name
+   *          the Accumulo instance name
    * @param zooKeepers
    *          a comma-separated list of zookeeper servers
+   * @since 1.5.0
    */
-  public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
+  public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
+    InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
   }
   
   /**
-   * Configure a {@link MockInstance} for this configuration object.
+   * Configures a {@link MockInstance} for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param instanceName
-   *          the accumulo instance name
+   *          the Accumulo instance name
+   * @since 1.5.0
    */
-  public static void setMockInstance(Configuration conf, String instanceName) {
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    conf.setBoolean(MOCK, true);
-    conf.set(INSTANCE_NAME, instanceName);
+  public static void setMockInstance(Job job, String instanceName) {
+    InputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
   }
   
   /**
-   * Set the ranges to map over for this configuration object.
+   * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @param ranges
-   *          the ranges that will be mapped over
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return an Accumulo instance
+   * @since 1.5.0
+   * @see #setZooKeeperInstance(Job, String, String)
+   * @see #setMockInstance(Job, String)
    */
-  public static void setRanges(Configuration conf, Collection<Range> ranges) {
-    ArgumentChecker.notNull(ranges);
-    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
-    try {
-      for (Range r : ranges) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        r.write(new DataOutputStream(baos));
-        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
-      }
-    } catch (IOException ex) {
-      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
-    }
-    conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
+  protected static Instance getInstance(JobContext context) {
+    return InputConfigurator.getInstance(CLASS, context.getConfiguration());
   }
   
   /**
-   * Disables the adjustment of ranges for this configuration object. By default, overlapping ranges will be merged and ranges will be fit to existing tablet
-   * boundaries. Disabling this adjustment will cause there to be exactly one mapper per range set using {@link #setRanges(Configuration, Collection)}.
+   * Sets the log level for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param level
+   *          the logging level
+   * @since 1.5.0
    */
-  public static void disableAutoAdjustRanges(Configuration conf) {
-    conf.setBoolean(AUTO_ADJUST_RANGES, false);
+  public static void setLogLevel(Job job, Level level) {
+    InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
   
   /**
-   * Sets the max # of values that may be returned for an individual Accumulo cell. By default, applied before all other Accumulo iterators (highest priority)
-   * leveraged in the scan by the record reader. To adjust priority use setIterator() & setIteratorOptions() w/ the VersioningIterator type explicitly.
+   * Gets the log level from this configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @param maxVersions
-   *          the max number of versions per accumulo cell
-   * @throws IOException
-   *           if maxVersions is < 1
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the log level
+   * @since 1.5.0
+   * @see #setLogLevel(Job, Level)
    */
-  public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
-    if (maxVersions < 1)
-      throw new IOException("Invalid maxVersions: " + maxVersions + ".  Must be >= 1");
-    conf.setInt(MAX_VERSIONS, maxVersions);
+  protected static Level getLogLevel(JobContext context) {
+    return InputConfigurator.getLogLevel(CLASS, context.getConfiguration());
   }
   
   /**
-   * <p>
-   * Enable reading offline tables. This will make the map reduce job directly read the tables files. If the table is not offline, then the job will fail. If
-   * the table comes online during the map reduce job, its likely that the job will fail.
-   * 
-   * <p>
-   * To use this option, the map reduce user will need access to read the accumulo directory in HDFS.
-   * 
-   * <p>
-   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
-   * on the mappers classpath. The accumulo-site.xml may need to be on the mappers classpath if HDFS or the accumlo directory in HDFS are non-standard.
-   * 
-   * <p>
-   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
-   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
-   * reason to do this is that compaction will reduce each tablet in the table to one file, and its faster to read from one file.
-   * 
-   * <p>
-   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
-   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
+   * Sets the name of the input table, over which this job will scan.
    * 
-   * @param conf
-   *          the job
-   * @param scanOff
-   *          pass true to read offline tables
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableName
+   *          the table to use when the tablename is null in the write call
+   * @since 1.5.0
    */
-  
-  public static void setScanOffline(Configuration conf, boolean scanOff) {
-    conf.setBoolean(READ_OFFLINE, scanOff);
+  public static void setInputTableName(Job job, String tableName) {
+    InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
   }
   
   /**
-   * Restricts the columns that will be mapped over for this configuration object.
+   * Gets the table name from the configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @param columnFamilyColumnQualifierPairs
-   *          A pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
-   *          selected. An empty set is the default and is equivalent to scanning the all columns.
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the table name
+   * @since 1.5.0
+   * @see #setInputTableName(Job, String)
    */
-  public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
-    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
-    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-      if (column.getFirst() == null)
-        throw new IllegalArgumentException("Column family can not be null");
-      
-      String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())));
-      if (column.getSecond() != null)
-        col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())));
-      columnStrings.add(col);
-    }
-    conf.setStrings(COLUMNS, columnStrings.toArray(new String[0]));
+  protected static String getInputTableName(JobContext context) {
+    return InputConfigurator.getInputTableName(CLASS, context.getConfiguration());
   }
   
   /**
-   * Sets the log level for this configuration object.
+   * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @param level
-   *          the logging level
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param auths
+   *          the user's authorizations
+   * @since 1.5.0
    */
-  public static void setLogLevel(Configuration conf, Level level) {
-    ArgumentChecker.notNull(level);
-    log.setLevel(level);
-    conf.setInt(LOGLEVEL, level.toInt());
+  public static void setScanAuthorizations(Job job, Authorizations auths) {
+    InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
   }
   
   /**
-   * Encode an iterator on the input for this configuration object.
+   * Gets the authorizations to set for the scans from the configuration.
    * 
-   * @param conf
-   *          The Hadoop configuration in which to save the iterator configuration
-   * @param cfg
-   *          The configuration of the iterator
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the Accumulo scan authorizations
+   * @since 1.5.0
+   * @see #setScanAuthorizations(Job, Authorizations)
    */
-  public static void addIterator(Configuration conf, IteratorSetting cfg) {
-    // First check to see if anything has been set already
-    String iterators = conf.get(ITERATORS);
-    
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    String newIter;
-    try {
-      cfg.write(new DataOutputStream(baos));
-      newIter = new String(Base64.encodeBase64(baos.toByteArray()));
-      baos.close();
-    } catch (IOException e) {
-      throw new IllegalArgumentException("unable to serialize IteratorSetting");
-    }
-    
-    // No iterators specified yet, create a new string
-    if (iterators == null || iterators.isEmpty()) {
-      iterators = newIter;
-    } else {
-      // append the next iterator & reset
-      iterators = iterators.concat(ITERATORS_DELIM + newIter);
-    }
-    // Store the iterators w/ the job
-    conf.set(ITERATORS, iterators);
+  protected static Authorizations getScanAuthorizations(JobContext context) {
+    return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration());
   }
   
   /**
-   * Determines whether a configuration has isolation enabled.
+   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return true if isolation is enabled, false otherwise
-   * @see #setIsolated(Configuration, boolean)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param ranges
+   *          the ranges that will be mapped over
+   * @since 1.5.0
    */
-  protected static boolean isIsolated(Configuration conf) {
-    return conf.getBoolean(ISOLATED, false);
+  public static void setRanges(Job job, Collection<Range> ranges) {
+    InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
   }
   
   /**
-   * Determines whether a configuration uses local iterators.
+   * Gets the ranges to scan over from a job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return true if uses local iterators, false otherwise
-   * @see #setLocalIterators(Configuration, boolean)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the ranges
+   * @throws IOException
+   *           if the ranges have been encoded improperly
+   * @since 1.5.0
+   * @see #setRanges(Job, Collection)
    */
-  protected static boolean usesLocalIterators(Configuration conf) {
-    return conf.getBoolean(LOCAL_ITERATORS, false);
+  protected static List<Range> getRanges(JobContext context) throws IOException {
+    return InputConfigurator.getRanges(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the user name from the configuration.
+   * Restricts the columns that will be mapped over for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the user name
-   * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param columnFamilyColumnQualifierPairs
+   *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
+   *          selected. An empty set is the default and is equivalent to scanning the all columns.
+   * @since 1.5.0
    */
-  protected static String getUsername(Configuration conf) {
-    return conf.get(USERNAME);
+  public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
+    InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
   }
   
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Gets the columns to be mapped over from this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the BASE64-encoded password
-   * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return a set of columns
+   * @since 1.5.0
+   * @see #fetchColumns(Job, Collection)
    */
-  protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
+    return InputConfigurator.getFetchedColumns(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the table name from the configuration.
+   * Encode an iterator on the input for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the table name
-   * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param cfg
+   *          the configuration of the iterator
+   * @since 1.5.0
    */
-  protected static String getTablename(Configuration conf) {
-    return conf.get(TABLE_NAME);
+  public static void addIterator(Job job, IteratorSetting cfg) {
+    InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
   }
   
   /**
-   * Gets the authorizations to set for the scans from the configuration.
+   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the accumulo scan authorizations
-   * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return a list of iterators
+   * @since 1.5.0
+   * @see #addIterator(Job, IteratorSetting)
    */
-  protected static Authorizations getAuthorizations(Configuration conf) {
-    String authString = conf.get(AUTHORIZATIONS);
-    return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.getBytes());
+  protected static List<IteratorSetting> getIterators(JobContext context) {
+    return InputConfigurator.getIterators(CLASS, context.getConfiguration());
   }
   
   /**
-   * Initializes an Accumulo {@link Instance} based on the configuration.
+   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
+   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
+   * 
+   * <p>
+   * By default, this feature is <b>enabled</b>.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return an accumulo instance
-   * @see #setZooKeeperInstance(Configuration, String, String)
-   * @see #setMockInstance(Configuration, String)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @see #setRanges(Job, Collection)
+   * @since 1.5.0
    */
-  protected static Instance getInstance(Configuration conf) {
-    if (conf.getBoolean(MOCK, false))
-      return new MockInstance(conf.get(INSTANCE_NAME));
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
+  public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
+    InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Initializes an Accumulo {@link TabletLocator} based on the configuration.
+   * Determines whether a configuration has auto-adjust ranges enabled.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return an accumulo tablet locator
-   * @throws TableNotFoundException
-   *           if the table name set on the configuration doesn't exist
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return false if the feature is disabled, true otherwise
+   * @since 1.5.0
+   * @see #setAutoAdjustRanges(Job, boolean)
    */
-  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
-    if (conf.getBoolean(MOCK, false))
-      return new MockTabletLocator();
-    Instance instance = getInstance(conf);
-    String username = getUsername(conf);
-    byte[] password = getPassword(conf);
-    String tableName = getTablename(conf);
-    return TabletLocator.getInstance(instance, new InstanceTokenWrapper(new UserPassToken(username, ByteBuffer.wrap(password)), instance.getInstanceID()), new Text(Tables.getTableId(instance, tableName)));
+  protected static boolean getAutoAdjustRanges(JobContext context) {
+    return InputConfigurator.getAutoAdjustRanges(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the ranges to scan over from a configuration object.
+   * Controls the use of the {@link IsolatedScanner} in this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the ranges
-   * @throws IOException
-   *           if the ranges have been encoded improperly
-   * @see #setRanges(Configuration, Collection)
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
    */
-  protected static List<Range> getRanges(Configuration conf) throws IOException {
-    ArrayList<Range> ranges = new ArrayList<Range>();
-    for (String rangeString : conf.getStringCollection(RANGES)) {
-      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
-      Range range = new Range();
-      range.readFields(new DataInputStream(bais));
-      ranges.add(range);
-    }
-    return ranges;
+  public static void setScanIsolation(Job job, boolean enableFeature) {
+    InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Gets the columns to be mapped over from this configuration object.
+   * Determines whether a configuration has isolation enabled.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return a set of columns
-   * @see #fetchColumns(Configuration, Collection)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setScanIsolation(Job, boolean)
    */
-  protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
-    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
-    for (String col : conf.getStringCollection(COLUMNS)) {
-      int idx = col.indexOf(":");
-      Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes()) : Base64.decodeBase64(col.substring(0, idx).getBytes()));
-      Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
-      columns.add(new Pair<Text,Text>(cf, cq));
-    }
-    return columns;
+  protected static boolean isIsolated(JobContext context) {
+    return InputConfigurator.isIsolated(CLASS, context.getConfiguration());
   }
   
   /**
-   * Determines whether a configuration has auto-adjust ranges enabled.
+   * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
+   * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return true if auto-adjust is enabled, false otherwise
-   * @see #disableAutoAdjustRanges(Configuration)
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
    */
-  protected static boolean getAutoAdjustRanges(Configuration conf) {
-    return conf.getBoolean(AUTO_ADJUST_RANGES, true);
+  public static void setLocalIterators(Job job, boolean enableFeature) {
+    InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Gets the log level from this configuration.
+   * Determines whether a configuration uses local iterators.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the log level
-   * @see #setLogLevel(Configuration, Level)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setLocalIterators(Job, boolean)
    */
-  protected static Level getLogLevel(Configuration conf) {
-    return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
+  protected static boolean usesLocalIterators(JobContext context) {
+    return InputConfigurator.usesLocalIterators(CLASS, context.getConfiguration());
   }
   
-  // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
-   * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
+   * <p>
+   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
+   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
+   * fail.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @throws IOException
-   *           if the configuration is improperly configured
+   * <p>
+   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
+   * 
+   * <p>
+   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
+   * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
+   * 
+   * <p>
+   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
+   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
+   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
+   * 
+   * <p>
+   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
+   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
    */
-  protected static void validateOptions(Configuration conf) throws IOException {
-    if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
-      throw new IOException("Input info has not been set.");
-    if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IOException("Instance info has not been set.");
-    // validate that we can connect as configured
-    try {
-      Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
-      if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf)))
-        throw new IOException("Unable to authenticate user");
-      if (!c.securityOperations().hasTablePermission(getUsername(conf), getTablename(conf), TablePermission.READ))
-        throw new IOException("Unable to access table");
-      
-      if (!usesLocalIterators(conf)) {
-        // validate that any scan-time iterators can be loaded by the the tablet servers
-        for (IteratorSetting iter : getIterators(conf)) {
-          if (!c.instanceOperations().testClassLoad(iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
-            throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
-        }
-      }
-      
-    } catch (AccumuloException e) {
-      throw new IOException(e);
-    } catch (AccumuloSecurityException e) {
-      throw new IOException(e);
-    }
+  public static void setOfflineTableScan(Job job, boolean enableFeature) {
+    InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Gets the maxVersions to use for the {@link VersioningIterator} from this configuration.
+   * Determines whether a configuration has the offline table scan feature enabled.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the max versions, -1 if not configured
-   * @see #setMaxVersions(Configuration, int)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setOfflineTableScan(Job, boolean)
    */
-  protected static int getMaxVersions(Configuration conf) {
-    return conf.getInt(MAX_VERSIONS, -1);
+  protected static boolean isOfflineScan(JobContext context) {
+    return InputConfigurator.isOfflineScan(CLASS, context.getConfiguration());
   }
   
-  protected static boolean isOfflineScan(Configuration conf) {
-    return conf.getBoolean(READ_OFFLINE, false);
+  /**
+   * Initializes an Accumulo {@link TabletLocator} based on the configuration.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return an Accumulo tablet locator
+   * @throws TableNotFoundException
+   *           if the table name set on the configuration doesn't exist
+   * @since 1.5.0
+   */
+  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, context.getConfiguration());
   }
   
-  // Return a list of the iterator settings (for iterators to apply to a scanner)
-  
+  // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
-   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
+   * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return a list of iterators
-   * @see #addIterator(Configuration, IteratorSetting)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @throws IOException
+   *           if the context is improperly configured
+   * @since 1.5.0
    */
-  protected static List<IteratorSetting> getIterators(Configuration conf) {
-    
-    String iterators = conf.get(ITERATORS);
-    
-    // If no iterators are present, return an empty list
-    if (iterators == null || iterators.isEmpty())
-      return new ArrayList<IteratorSetting>();
-    
-    // Compose the set of iterators encoded in the job configuration
-    StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS), ITERATORS_DELIM);
-    List<IteratorSetting> list = new ArrayList<IteratorSetting>();
-    try {
-      while (tokens.hasMoreTokens()) {
-        String itstring = tokens.nextToken();
-        ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
-        list.add(new IteratorSetting(new DataInputStream(bais)));
-        bais.close();
-      }
-    } catch (IOException e) {
-      throw new IllegalArgumentException("couldn't decode iterator settings");
-    }
-    return list;
+  protected static void validateOptions(JobContext context) throws IOException {
+    InputConfigurator.validateOptions(CLASS, context.getConfiguration());
   }
   
+  /**
+   * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
+   * types.
+   * 
+   * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables:
+   * <ul>
+   * <li>K {@link #currentK}</li>
+   * <li>V {@link #currentV}</li>
+   * <li>Key {@link #currentKey} (used for progress reporting)</li>
+   * <li>int {@link #numKeysRead} (used for progress reporting)</li>
+   * </ul>
+   */
   protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
@@ -641,80 +519,56 @@ public abstract class InputFormatBase<K,
     /**
      * Apply the configured iterators from the configuration to the scanner.
      * 
-     * @param conf
-     *          the Hadoop configuration object
+     * @param context
+     *          the Hadoop context for the configured job
      * @param scanner
      *          the scanner to configure
-     * @throws AccumuloException
      */
-    protected void setupIterators(Configuration conf, Scanner scanner) throws AccumuloException {
-      List<IteratorSetting> iterators = getIterators(conf);
+    protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
+      List<IteratorSetting> iterators = getIterators(context);
       for (IteratorSetting iterator : iterators) {
         scanner.addScanIterator(iterator);
       }
     }
     
     /**
-     * If maxVersions has been set, configure a {@link VersioningIterator} at priority 0 for this scanner.
-     * 
-     * @param conf
-     *          the Hadoop configuration object
-     * @param scanner
-     *          the scanner to configure
-     */
-    protected void setupMaxVersions(Configuration conf, Scanner scanner) {
-      int maxVersions = getMaxVersions(conf);
-      // Check to make sure its a legit value
-      if (maxVersions >= 1) {
-        IteratorSetting vers = new IteratorSetting(0, "vers", VersioningIterator.class);
-        VersioningIterator.setMaxVersions(vers, maxVersions);
-        scanner.addScanIterator(vers);
-      }
-    }
-    
-    /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
+    @Override
     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
-      initialize(inSplit, attempt.getConfiguration());
-    }
-    
-    public void initialize(InputSplit inSplit, Configuration conf) throws IOException {
       Scanner scanner;
       split = (RangeInputSplit) inSplit;
       log.debug("Initializing input split: " + split.range);
-      Instance instance = getInstance(conf);
-      String user = getUsername(conf);
-      byte[] password = getPassword(conf);
-      Authorizations authorizations = getAuthorizations(conf);
+      Instance instance = getInstance(attempt);
+      AccumuloToken<?,?> token = getToken(attempt);
+      Authorizations authorizations = getScanAuthorizations(attempt);
       
       try {
-        log.debug("Creating connector with user: " + user);
-        Connector conn = instance.getConnector(user, password);
-        log.debug("Creating scanner for table: " + getTablename(conf));
+        log.debug("Creating connector with user: " + token.getPrincipal());
+        Connector conn = instance.getConnector(token);
+        log.debug("Creating scanner for table: " + getInputTableName(attempt));
         log.debug("Authorizations are: " + authorizations);
-        if (isOfflineScan(conf)) {
-          scanner = new OfflineScanner(instance, new UserPassToken(user, ByteBuffer.wrap(password)), Tables.getTableId(instance, getTablename(conf)),
+        if (isOfflineScan(attempt)) {
+          scanner = new OfflineScanner(instance, token, Tables.getTableId(instance, getInputTableName(attempt)),
               authorizations);
         } else {
-          scanner = conn.createScanner(getTablename(conf), authorizations);
+          scanner = conn.createScanner(getInputTableName(attempt), authorizations);
         }
-        if (isIsolated(conf)) {
+        if (isIsolated(attempt)) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
         }
-        if (usesLocalIterators(conf)) {
+        if (usesLocalIterators(attempt)) {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupMaxVersions(conf, scanner);
-        setupIterators(conf, scanner);
+        setupIterators(attempt, scanner);
       } catch (Exception e) {
         throw new IOException(e);
       }
       
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(conf)) {
+      for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -732,8 +586,10 @@ public abstract class InputFormatBase<K,
       scannerIterator = scanner.iterator();
     }
     
+    @Override
     public void close() {}
     
+    @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
@@ -756,13 +612,13 @@ public abstract class InputFormatBase<K,
     }
   }
   
-  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(Configuration conf, String tableName, List<Range> ranges) throws TableNotFoundException,
+  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
       AccumuloException, AccumuloSecurityException {
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     
-    Instance instance = getInstance(conf);
-    Connector conn = instance.getConnector(getUsername(conf), getPassword(conf));
+    Instance instance = getInstance(context);
+    Connector conn = instance.getConnector(getToken(context));
     String tableId = Tables.getTableId(instance, tableName);
     
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
@@ -859,17 +715,14 @@ public abstract class InputFormatBase<K,
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
-    return getSplits(job.getConfiguration());
-  }
-  
-  public List<InputSplit> getSplits(Configuration conf) throws IOException {
-    log.setLevel(getLogLevel(conf));
-    validateOptions(conf);
-    
-    String tableName = getTablename(conf);
-    boolean autoAdjust = getAutoAdjustRanges(conf);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(conf)) : getRanges(conf);
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    log.setLevel(getLogLevel(context));
+    validateOptions(context);
+    
+    String tableName = getInputTableName(context);
+    boolean autoAdjust = getAutoAdjustRanges(context);
+    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
     
     if (ranges.isEmpty()) {
       ranges = new ArrayList<Range>(1);
@@ -880,17 +733,17 @@ public abstract class InputFormatBase<K,
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
     try {
-      if (isOfflineScan(conf)) {
-        binnedRanges = binOfflineTable(conf, tableName, ranges);
+      if (isOfflineScan(context)) {
+        binnedRanges = binOfflineTable(context, tableName, ranges);
         while (binnedRanges == null) {
           // Some tablets were still online, try again
           UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(conf, tableName, ranges);
+          binnedRanges = binOfflineTable(context, tableName, ranges);
         }
       } else {
-        Instance instance = getInstance(conf);
+        Instance instance = getInstance(context);
         String tableId = null;
-        tl = getTabletLocator(conf);
+        tl = getTabletLocator(context);
         // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
         tl.invalidateCache();
         while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
@@ -1025,6 +878,7 @@ public abstract class InputFormatBase<K,
     /**
      * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
      */
+    @Override
     public long getLength() throws IOException {
       Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
       Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
@@ -1044,6 +898,7 @@ public abstract class InputFormatBase<K,
       return diff + 1;
     }
     
+    @Override
     public String[] getLocations() throws IOException {
       return locations;
     }
@@ -1052,6 +907,7 @@ public abstract class InputFormatBase<K,
       this.locations = locations;
     }
     
+    @Override
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
       int numLocs = in.readInt();
@@ -1060,6 +916,7 @@ public abstract class InputFormatBase<K,
         locations[i] = in.readUTF();
     }
     
+    @Override
     public void write(DataOutput out) throws IOException {
       range.write(out);
       out.writeInt(locations.length);
@@ -1067,4 +924,381 @@ public abstract class InputFormatBase<K,
         out.writeUTF(locations[i]);
     }
   }
+  
+  // ----------------------------------------------------------------------------------------------------
+  // Everything below this line is deprecated and should go away in future versions
+  // ----------------------------------------------------------------------------------------------------
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setScanIsolation(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void setIsolated(Configuration conf, boolean enable) {
+    InputConfigurator.setScanIsolation(CLASS, conf, enable);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setLocalIterators(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void setLocalIterators(Configuration conf, boolean enable) {
+    InputConfigurator.setLocalIterators(CLASS, conf, enable);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, AccumuloToken}, {@link #setInputTableName(Job, String)}, and
+   *             {@link #setScanAuthorizations(Job, Authorizations)} instead.
+   */
+  @Deprecated
+  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
+    InputConfigurator.setConnectorInfo(CLASS, conf, new UserPassToken(user, passwd));
+    InputConfigurator.setInputTableName(CLASS, conf, table);
+    InputConfigurator.setScanAuthorizations(CLASS, conf, auths);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead.
+   */
+  @Deprecated
+  public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
+    InputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead.
+   */
+  @Deprecated
+  public static void setMockInstance(Configuration conf, String instanceName) {
+    InputConfigurator.setMockInstance(CLASS, conf, instanceName);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setRanges(Job, Collection)} instead.
+   */
+  @Deprecated
+  public static void setRanges(Configuration conf, Collection<Range> ranges) {
+    InputConfigurator.setRanges(CLASS, conf, ranges);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setAutoAdjustRanges(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void disableAutoAdjustRanges(Configuration conf) {
+    InputConfigurator.setAutoAdjustRanges(CLASS, conf, false);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
+   */
+  @Deprecated
+  public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
+    IteratorSetting vers = new IteratorSetting(0, "vers", VersioningIterator.class);
+    VersioningIterator.setMaxVersions(vers, maxVersions);
+    InputConfigurator.addIterator(CLASS, conf, vers);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setOfflineTableScan(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void setScanOffline(Configuration conf, boolean scanOff) {
+    InputConfigurator.setOfflineTableScan(CLASS, conf, scanOff);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #fetchColumns(Job, Collection)} instead.
+   */
+  @Deprecated
+  public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
+    InputConfigurator.fetchColumns(CLASS, conf, columnFamilyColumnQualifierPairs);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead.
+   */
+  @Deprecated
+  public static void setLogLevel(Configuration conf, Level level) {
+    InputConfigurator.setLogLevel(CLASS, conf, level);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} instead.
+   */
+  @Deprecated
+  public static void addIterator(Configuration conf, IteratorSetting cfg) {
+    InputConfigurator.addIterator(CLASS, conf, cfg);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #isIsolated(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean isIsolated(Configuration conf) {
+    return InputConfigurator.isIsolated(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #usesLocalIterators(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean usesLocalIterators(Configuration conf) {
+    return InputConfigurator.usesLocalIterators(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead.
+   */
+  @Deprecated
+  protected static String getUsername(Configuration conf) {
+    return InputConfigurator.getToken(CLASS, conf).getPrincipal();
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead.
+   */
+  @Deprecated
+  protected static byte[] getPassword(Configuration conf) {
+    AccumuloToken<?,?> token = InputConfigurator.getToken(CLASS, conf);
+    if (token instanceof UserPassToken) {
+      UserPassToken upt = (UserPassToken) token;
+      return upt.getPassword();
+    }
+    throw new RuntimeException("Not applicable for non-UserPassTokens");
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getInputTableName(JobContext)} instead.
+   */
+  @Deprecated
+  protected static String getTablename(Configuration conf) {
+    return InputConfigurator.getInputTableName(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getScanAuthorizations(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Authorizations getAuthorizations(Configuration conf) {
+    return InputConfigurator.getScanAuthorizations(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Instance getInstance(Configuration conf) {
+    return InputConfigurator.getInstance(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getTabletLocator(JobContext)} instead.
+   */
+  @Deprecated
+  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getRanges(JobContext)} instead.
+   */
+  @Deprecated
+  protected static List<Range> getRanges(Configuration conf) throws IOException {
+    return InputConfigurator.getRanges(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getFetchedColumns(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
+    return InputConfigurator.getFetchedColumns(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getAutoAdjustRanges(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean getAutoAdjustRanges(Configuration conf) {
+    return InputConfigurator.getAutoAdjustRanges(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Level getLogLevel(Configuration conf) {
+    return InputConfigurator.getLogLevel(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #validateOptions(JobContext)} instead.
+   */
+  @Deprecated
+  protected static void validateOptions(Configuration conf) throws IOException {
+    InputConfigurator.validateOptions(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
+   */
+  @Deprecated
+  protected static int getMaxVersions(Configuration conf) {
+    // This is so convoluted, because the only reason to get the number of maxVersions is to construct the same type of IteratorSetting object we have to
+    // deconstruct to get at this option in the first place, but to preserve correct behavior, this appears necessary.
+    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
+    for (IteratorSetting setting : iteratorSettings) {
+      if ("vers".equals(setting.getName()) && 0 == setting.getPriority() && VersioningIterator.class.getName().equals(setting.getIteratorClass())) {
+        if (setting.getOptions().containsKey("maxVersions"))
+          return Integer.parseInt(setting.getOptions().get("maxVersions"));
+        else
+          return -1;
+      }
+    }
+    return -1;
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #isOfflineScan(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean isOfflineScan(Configuration conf) {
+    return InputConfigurator.isOfflineScan(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
+   */
+  @Deprecated
+  protected static List<AccumuloIterator> getIterators(Configuration conf) {
+    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
+    List<AccumuloIterator> deprecatedIterators = new ArrayList<AccumuloIterator>(iteratorSettings.size());
+    for (IteratorSetting setting : iteratorSettings) {
+      AccumuloIterator deprecatedIter = new AccumuloIterator(new String(setting.getPriority() + AccumuloIterator.FIELD_SEP + setting.getIteratorClass()
+          + AccumuloIterator.FIELD_SEP + setting.getName()));
+      deprecatedIterators.add(deprecatedIter);
+    }
+    return deprecatedIterators;
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
+   */
+  @Deprecated
+  protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
+    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
+    List<AccumuloIteratorOption> deprecatedIteratorOptions = new ArrayList<AccumuloIteratorOption>(iteratorSettings.size());
+    for (IteratorSetting setting : iteratorSettings) {
+      for (Entry<String,String> opt : setting.getOptions().entrySet()) {
+        String deprecatedOption;
+        try {
+          deprecatedOption = new String(setting.getName() + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getKey(), "UTF-8")
+              + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getValue(), "UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+          throw new RuntimeException(e);
+        }
+        deprecatedIteratorOptions.add(new AccumuloIteratorOption(deprecatedOption));
+      }
+    }
+    return deprecatedIteratorOptions;
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
+   */
+  @Deprecated
+  static class AccumuloIterator {
+    
+    private static final String FIELD_SEP = ":";
+    
+    private int priority;
+    private String iteratorClass;
+    private String iteratorName;
+    
+    public AccumuloIterator(int priority, String iteratorClass, String iteratorName) {
+      this.priority = priority;
+      this.iteratorClass = iteratorClass;
+      this.iteratorName = iteratorName;
+    }
+    
+    // Parses out a setting given an string supplied from an earlier toString() call
+    public AccumuloIterator(String iteratorSetting) {
+      // Parse the string to expand the iterator
+      StringTokenizer tokenizer = new StringTokenizer(iteratorSetting, FIELD_SEP);
+      priority = Integer.parseInt(tokenizer.nextToken());
+      iteratorClass = tokenizer.nextToken();
+      iteratorName = tokenizer.nextToken();
+    }
+    
+    public int getPriority() {
+      return priority;
+    }
+    
+    public String getIteratorClass() {
+      return iteratorClass;
+    }
+    
+    public String getIteratorName() {
+      return iteratorName;
+    }
+    
+    @Override
+    public String toString() {
+      return new String(priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName);
+    }
+    
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
+   */
+  @Deprecated
+  static class AccumuloIteratorOption {
+    private static final String FIELD_SEP = ":";
+    
+    private String iteratorName;
+    private String key;
+    private String value;
+    
+    public AccumuloIteratorOption(String iteratorName, String key, String value) {
+      this.iteratorName = iteratorName;
+      this.key = key;
+      this.value = value;
+    }
+    
+    // Parses out an option given a string supplied from an earlier toString() call
+    public AccumuloIteratorOption(String iteratorOption) {
+      StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP);
+      this.iteratorName = tokenizer.nextToken();
+      try {
+        this.key = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
+        this.value = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    public String getIteratorName() {
+      return iteratorName;
+    }
+    
+    public String getKey() {
+      return key;
+    }
+    
+    public String getValue() {
+      return value;
+    }
+    
+    @Override
+    public String toString() {
+      try {
+        return new String(iteratorName + FIELD_SEP + URLEncoder.encode(key, "UTF-8") + FIELD_SEP + URLEncoder.encode(value, "UTF-8"));
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+  }
+  
 }

Copied: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java (from r1437607, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java)
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java?p2=accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java&p1=accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java&r1=1437607&r2=1437726&rev=1437726&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java Wed Jan 23 20:51:59 2013
@@ -16,13 +16,12 @@
  */
 package org.apache.accumulo.core.client.mapreduce.util;
 
-import java.nio.charset.Charset;
-
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.security.tokens.AccumuloToken;
+import org.apache.accumulo.core.security.tokens.TokenHelper;
 import org.apache.accumulo.core.util.ArgumentChecker;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
@@ -39,7 +38,7 @@ public class ConfiguratorBase {
    * @since 1.5.0
    */
   public static enum ConnectorInfo {
-    IS_CONFIGURED, USER_NAME, PASSWORD
+    IS_CONFIGURED, TOKEN
   }
   
   /**
@@ -81,20 +80,17 @@ public class ConfiguratorBase {
    *          the class whose name will be used as a prefix for the property configuration key
    * @param conf
    *          the Hadoop configuration object to configure
-   * @param user
-   *          a valid Accumulo user name
-   * @param passwd
-   *          the user's password
+   * @param token
+   *          a valid AccumuloToken
    * @since 1.5.0
    */
-  public static void setConnectorInfo(Class<?> implementingClass, Configuration conf, String user, byte[] passwd) {
+  public static void setConnectorInfo(Class<?> implementingClass, Configuration conf, AccumuloToken<?,?> token) {
     if (isConnectorInfoSet(implementingClass, conf))
       throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job");
     
-    ArgumentChecker.notNull(user, passwd);
+    ArgumentChecker.notNull(token);
     conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
-    conf.set(enumToConfKey(implementingClass, ConnectorInfo.USER_NAME), user);
-    conf.set(enumToConfKey(implementingClass, ConnectorInfo.PASSWORD), new String(Base64.encodeBase64(passwd), Charset.forName("UTF-8")));
+    conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), TokenHelper.asBase64String(token));
   }
   
   /**
@@ -112,34 +108,18 @@ public class ConfiguratorBase {
   }
   
   /**
-   * Gets the user name from the configuration.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return the user name
-   * @since 1.5.0
-   * @see #setConnectorInfo(Class, Configuration, String, byte[])
-   */
-  public static String getUsername(Class<?> implementingClass, Configuration conf) {
-    return conf.get(enumToConfKey(implementingClass, ConnectorInfo.USER_NAME));
-  }
-  
-  /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Gets the AccumuloToken from the configuration.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
    * @param conf
    *          the Hadoop configuration object to configure
-   * @return the decoded user password
+   * @return the identifying AccumuloToken
    * @since 1.5.0
-   * @see #setConnectorInfo(Class, Configuration, String, byte[])
+   * @see #setConnectorInfo(Class, Configuration, AccumuloToken)
    */
-  public static byte[] getPassword(Class<?> implementingClass, Configuration conf) {
-    return Base64.decodeBase64(conf.get(enumToConfKey(implementingClass, ConnectorInfo.PASSWORD), "").getBytes(Charset.forName("UTF-8")));
+  public static AccumuloToken<?,?> getToken(Class<?> implementingClass, Configuration conf) {
+    return TokenHelper.fromBase64String(conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN)));
   }
   
   /**

Copied: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java (from r1437607, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java)
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java?p2=accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java&p1=accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java&r1=1437607&r2=1437726&rev=1437726&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java Wed Jan 23 20:51:59 2013
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -47,7 +46,7 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.security.tokens.AccumuloToken;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
@@ -481,11 +480,9 @@ public class InputConfigurator extends C
     if ("MockInstance".equals(instanceType))
       return new MockTabletLocator();
     Instance instance = getInstance(implementingClass, conf);
-    String username = getUsername(implementingClass, conf);
-    byte[] password = getPassword(implementingClass, conf);
+    AccumuloToken<?,?> token = getToken(implementingClass, conf);
     String tableName = getInputTableName(implementingClass, conf);
-    return TabletLocator.getInstance(instance, new AuthInfo(username, ByteBuffer.wrap(password), instance.getInstanceID()),
-        new Text(Tables.getTableId(instance, tableName)));
+    return TabletLocator.getInstance(instance, token, new Text(Tables.getTableId(instance, tableName)));
   }
   
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -508,10 +505,11 @@ public class InputConfigurator extends C
       throw new IOException("Instance info has not been set.");
     // validate that we can connect as configured
     try {
-      Connector c = getInstance(implementingClass, conf).getConnector(getUsername(implementingClass, conf), getPassword(implementingClass, conf));
-      if (!c.securityOperations().authenticateUser(getUsername(implementingClass, conf), getPassword(implementingClass, conf)))
+      Connector c = getInstance(implementingClass, conf).getConnector(getToken(implementingClass, conf));
+      if (!c.securityOperations().authenticateUser(getToken(implementingClass, conf)))
         throw new IOException("Unable to authenticate user");
-      if (!c.securityOperations().hasTablePermission(getUsername(implementingClass, conf), getInputTableName(implementingClass, conf), TablePermission.READ))
+      if (!c.securityOperations().hasTablePermission(getToken(implementingClass, conf).getPrincipal(), getInputTableName(implementingClass, conf),
+          TablePermission.READ))
         throw new IOException("Unable to access table");
       
       if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) {

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstanceOperations.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstanceOperations.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstanceOperations.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstanceOperations.java Wed Jan 23 20:51:59 2013
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.ActiveCompaction;
 import org.apache.accumulo.core.client.admin.ActiveScan;
 import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
@@ -114,4 +115,25 @@ public class MockInstanceOperations impl
     }
     return true;
   }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.accumulo.core.client.admin.InstanceOperations#getActiveCompactions(java.lang.String)
+   */
+  @Override
+  public List<ActiveCompaction> getActiveCompactions(String tserver) throws AccumuloException, AccumuloSecurityException {
+    return new ArrayList<ActiveCompaction>();
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.accumulo.core.client.admin.InstanceOperations#ping(java.lang.String)
+   */
+  @Override
+  public void ping(String tserver) throws AccumuloException {
+    // TODO Auto-generated method stub
+    
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java Wed Jan 23 20:51:59 2013
@@ -85,7 +85,15 @@ public class MockSecurityOperations impl
       return false;
     return Arrays.equals(user.password, password);
   }
-  
+
+  @Override
+  public boolean authenticateUser(AccumuloToken<?,?> token) throws AccumuloException, AccumuloSecurityException {
+    MockUser user = acu.users.get(token.getPrincipal());
+    if (user == null)
+      return false;
+    return Arrays.equals(user.password, ((UserPassToken) token).getPassword());
+  }
+
   /**
    * @deprecated @since 1.5, use {@link #changeUserPassword(AccumuloToken)}
    */

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java Wed Jan 23 20:51:59 2013
@@ -47,6 +47,7 @@ public class ColumnUpdate {
    * @deprecated use setTimestamp(long);
    * @param timestamp
    */
+  @Deprecated
   public void setSystemTimestamp(long timestamp) {
     if (hasTimestamp)
       throw new IllegalStateException("Cannot set system timestamp when user set a timestamp");
@@ -84,23 +85,26 @@ public class ColumnUpdate {
     return this.val;
   }
   
+  @Override
   public String toString() {
     return new String(Arrays.toString(columnFamily)) + ":" + new String(Arrays.toString(columnQualifier)) + " ["
         + new String(Arrays.toString(columnVisibility)) + "] " + (hasTimestamp ? timestamp : "NO_TIME_STAMP") + " " + Arrays.toString(val) + " " + deleted;
   }
-
+  
   @Override
   public boolean equals(Object obj) {
     if (!(obj instanceof ColumnUpdate))
       return false;
-    ColumnUpdate upd = (ColumnUpdate)obj;
-    return Arrays.equals(getColumnFamily(), upd.getColumnFamily()) &&
-        Arrays.equals(getColumnQualifier(), upd.getColumnQualifier()) &&
-        Arrays.equals(getColumnVisibility(), upd.getColumnVisibility()) &&
-        isDeleted() == upd.isDeleted() &&
-        Arrays.equals(getValue(), upd.getValue()) &&
-        hasTimestamp() == upd.hasTimestamp() &&
-        getTimestamp() == upd.getTimestamp();
+    ColumnUpdate upd = (ColumnUpdate) obj;
+    return Arrays.equals(getColumnFamily(), upd.getColumnFamily()) && Arrays.equals(getColumnQualifier(), upd.getColumnQualifier())
+        && Arrays.equals(getColumnVisibility(), upd.getColumnVisibility()) && isDeleted() == upd.isDeleted() && Arrays.equals(getValue(), upd.getValue())
+        && hasTimestamp() == upd.hasTimestamp() && getTimestamp() == upd.getTimestamp();
   }
   
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(columnFamily) + Arrays.hashCode(columnQualifier) + Arrays.hashCode(columnVisibility)
+        + (hasTimestamp ? (Boolean.TRUE.hashCode() + new Long(timestamp).hashCode()) : Boolean.FALSE.hashCode())
+        + (deleted ? Boolean.TRUE.hashCode() : (Boolean.FALSE.hashCode() + Arrays.hashCode(val)));
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/data/PartialKey.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/data/PartialKey.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/data/PartialKey.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/data/PartialKey.java Wed Jan 23 20:51:59 2013
@@ -17,10 +17,9 @@
 package org.apache.accumulo.core.data;
 
 public enum PartialKey {
-  ROW(1), ROW_COLFAM(2), ROW_COLFAM_COLQUAL(3), ROW_COLFAM_COLQUAL_COLVIS(4), ROW_COLFAM_COLQUAL_COLVIS_TIME(5), ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL(6) // everything
-                                                                                                                                                       // with
-                                                                                                                                                       // delete
-                                                                                                                                                       // flag
+  ROW(1), ROW_COLFAM(2), ROW_COLFAM_COLQUAL(3), ROW_COLFAM_COLQUAL_COLVIS(4), ROW_COLFAM_COLQUAL_COLVIS_TIME(5),
+  //everything with delete flag
+  ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL(6) 
   ;
   
   int depth;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Wed Jan 23 20:51:59 2013
@@ -642,6 +642,7 @@ public class RFile {
     private void _seek(Range range) throws IOException {
       
       this.range = range;
+      this.checkRange = true;
       
       if (blockCount == 0) {
         // its an empty file

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java Wed Jan 23 20:51:59 2013
@@ -190,7 +190,7 @@ public class AggregatingIterator impleme
     for (Entry<String,String> entry : options.entrySet()) {
       String classname = entry.getValue();
       if (classname == null)
-        return false;
+        throw new IllegalArgumentException("classname null");
       Class<? extends Aggregator> clazz;
       try {
         clazz = AccumuloVFSClassLoader.loadClass(classname, Aggregator.class);

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java Wed Jan 23 20:51:59 2013
@@ -35,13 +35,13 @@ import org.apache.accumulo.core.iterator
 import org.apache.log4j.Logger;
 
 /**
- * A SortedKeyValueIterator that combines the Values for different versions (timestamps) of a Key into a single Value. Combiner will replace one or more versions of a Key
- * and their Values with the most recent Key and a Value which is the result of the reduce method.
+ * A SortedKeyValueIterator that combines the Values for different versions (timestamps) of a Key into a single Value. Combiner will replace one or more
+ * versions of a Key and their Values with the most recent Key and a Value which is the result of the reduce method.
  * 
  * Subclasses must implement a reduce method: {@code public Value reduce(Key key, Iterator<Value> iter)}.
  * 
- * This reduce method will be passed the most recent Key and an iterator over the Values for all non-deleted versions of that Key.
- * A combiner will not combine keys that differ by more than the timestamp.
+ * This reduce method will be passed the most recent Key and an iterator over the Values for all non-deleted versions of that Key. A combiner will not combine
+ * keys that differ by more than the timestamp.
  */
 public abstract class Combiner extends WrappingIterator implements OptionDescriber {
   static final Logger log = Logger.getLogger(Combiner.class);
@@ -254,20 +254,24 @@ public abstract class Combiner extends W
   @Override
   public boolean validateOptions(Map<String,String> options) {
     if (options.containsKey(ALL_OPTION)) {
-      combineAllColumns = Boolean.parseBoolean(options.get(ALL_OPTION));
+      try {
+        combineAllColumns = Boolean.parseBoolean(options.get(ALL_OPTION));
+      } catch (Exception e) {
+        throw new IllegalArgumentException("bad boolean " + ALL_OPTION + ":" + options.get(ALL_OPTION));
+      }
       if (combineAllColumns)
         return true;
     }
     if (!options.containsKey(COLUMNS_OPTION))
-      return false;
+      throw new IllegalArgumentException("options must include " + ALL_OPTION + " or " + COLUMNS_OPTION);
     
     String encodedColumns = options.get(COLUMNS_OPTION);
     if (encodedColumns.length() == 0)
-      return false;
+      throw new IllegalArgumentException("empty columns specified in option " + COLUMNS_OPTION);
     
     for (String columns : encodedColumns.split(",")) {
       if (!ColumnSet.isValidEncoding(columns))
-        return false;
+        throw new IllegalArgumentException("invalid column encoding " + encodedColumns);
     }
     
     return true;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java Wed Jan 23 20:51:59 2013
@@ -101,7 +101,11 @@ public abstract class Filter extends Wra
   @Override
   public boolean validateOptions(Map<String,String> options) {
     if (options.get(NEGATE) != null) {
-      Boolean.parseBoolean(options.get(NEGATE));
+      try {
+        Boolean.parseBoolean(options.get(NEGATE));
+      } catch (Exception e) {
+        throw new IllegalArgumentException("bad boolean " + NEGATE + ":" + options.get(NEGATE));
+      }
     }
     return true;
   }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/FirstEntryInRowIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/FirstEntryInRowIterator.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/FirstEntryInRowIterator.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/iterators/FirstEntryInRowIterator.java Wed Jan 23 20:51:59 2013
@@ -92,8 +92,7 @@ public class FirstEntryInRowIterator ext
         Key nextKey = getSource().getTopKey().followingKey(PartialKey.ROW);
         if (!latestRange.afterEndKey(nextKey))
           getSource().seek(new Range(nextKey, true, latestRange.getEndKey(), latestRange.isEndKeyInclusive()), latestColumnFamilies, latestInclusive);
-        else
-        {
+        else {
           finished = true;
           break;
         }
@@ -103,13 +102,12 @@ public class FirstEntryInRowIterator ext
   }
   
   private boolean finished = true;
-
+  
   @Override
-  public boolean hasTop()
-  {
+  public boolean hasTop() {
     return !finished && getSource().hasTop();
   }
-
+  
   @Override
   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
     // save parameters for future internal seeks
@@ -122,7 +120,7 @@ public class FirstEntryInRowIterator ext
     Range seekRange = new Range(startKey == null ? null : new Key(startKey.getRow()), true, range.getEndKey(), range.isEndKeyInclusive());
     super.seek(seekRange, columnFamilies, inclusive);
     finished = false;
-
+    
     if (getSource().hasTop()) {
       lastRowFound = getSource().getTopKey().getRow();
       if (range.beforeStartKey(getSource().getTopKey()))
@@ -143,11 +141,12 @@ public class FirstEntryInRowIterator ext
   public boolean validateOptions(Map<String,String> options) {
     try {
       String o = options.get(NUM_SCANS_STRING_NAME);
-      Integer i = o == null ? 10 : Integer.parseInt(o);
-      return i != null;
+      if (o != null)
+        Integer.parseInt(o);
     } catch (Exception e) {
-      return false;
+      throw new IllegalArgumentException("bad integer " + NUM_SCANS_STRING_NAME + ":" + options.get(NUM_SCANS_STRING_NAME), e);
     }
+    return true;
   }
   
 }



Mime
View raw message