accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject svn commit: r1437073 [2/3] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/cli/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ core/src/test/java/org/ap...
Date Tue, 22 Jan 2013 18:07:17 GMT
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Tue Jan 22 18:07:17 2013
@@ -16,12 +16,8 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.math.BigInteger;
@@ -32,7 +28,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -57,25 +52,20 @@ import org.apache.accumulo.core.client.Z
 import org.apache.accumulo.core.client.impl.OfflineScanner;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.mapreduce.util.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.mock.MockTabletLocator;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
-import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -98,336 +88,212 @@ import org.apache.log4j.Logger;
  * <p>
  * See {@link AccumuloInputFormat} for an example implementation.
  */
-
 public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
-  protected static final Logger log = Logger.getLogger(InputFormatBase.class);
-  
-  /**
-   * Prefix shared by all job configuration property names for this class
-   */
-  private static final String PREFIX = AccumuloInputFormat.class.getSimpleName();
   
-  /**
-   * Used to limit the times a job can be configured with input information to 1
-   * 
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
-   * @see #getUsername(JobContext)
-   * @see #getPassword(JobContext)
-   * @see #getTablename(JobContext)
-   * @see #getAuthorizations(JobContext)
-   */
-  private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
+  private static final Class<?> CLASS = AccumuloInputFormat.class;
+  protected static final Logger log = Logger.getLogger(CLASS);
   
   /**
-   * Used to limit the times a job can be configured with instance information to 1
+   * Sets the connector information needed to communicate with Accumulo in this job.
    * 
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param user
+   *          a valid Accumulo user name (user must have Table.CREATE permission)
+   * @param passwd
+   *          the user's password
+   * @since 1.5.0
    */
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
+  public static void setConnectorInfo(Job job, String user, byte[] passwd) {
+    InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), user, passwd);
+  }
   
   /**
-   * Key for storing the Accumulo user's name
+   * Determines if the connector has been configured.
    * 
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
-   * @see #getUsername(JobContext)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the connector has been configured, false otherwise
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  private static final String USERNAME = PREFIX + ".username";
+  protected static Boolean isConnectorInfoSet(JobContext context) {
+    return InputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration());
+  }
   
   /**
-   * Key for storing the Accumulo user's password
+   * Gets the user name from the configuration.
    * 
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
-   * @see #getPassword(JobContext)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the user name
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  private static final String PASSWORD = PREFIX + ".password";
+  protected static String getUsername(JobContext context) {
+    return InputConfigurator.getUsername(CLASS, context.getConfiguration());
+  }
   
   /**
-   * Key for storing the source table's name
+   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
+   * provide a charset safe conversion to a string, and is not intended to be secure.
    * 
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
-   * @see #getTablename(JobContext)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the decoded user password
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  private static final String TABLE_NAME = PREFIX + ".tablename";
+  protected static byte[] getPassword(JobContext context) {
+    return InputConfigurator.getPassword(CLASS, context.getConfiguration());
+  }
   
   /**
-   * Key for storing the authorizations to use to scan
+   * Configures a {@link ZooKeeperInstance} for this job.
    * 
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
-   * @see #getAuthorizations(JobContext)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param instanceName
+   *          the Accumulo instance name
+   * @param zooKeepers
+   *          a comma-separated list of zookeeper servers
+   * @since 1.5.0
    */
-  private static final String AUTHORIZATIONS = PREFIX + ".authorizations";
+  public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
+    InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
+  }
   
   /**
-   * Key for storing the Accumulo instance name to connect to
+   * Configures a {@link MockInstance} for this job.
    * 
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param instanceName
+   *          the Accumulo instance name
+   * @since 1.5.0
    */
-  private static final String INSTANCE_NAME = PREFIX + ".instanceName";
+  public static void setMockInstance(Job job, String instanceName) {
+    InputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
+  }
   
   /**
-   * Key for storing the set of Accumulo zookeeper servers to communicate with
+   * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return an Accumulo instance
+   * @since 1.5.0
    * @see #setZooKeeperInstance(Job, String, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
-  
-  /**
-   * Key for storing the directive to use the mock instance type
-   * 
    * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String MOCK = PREFIX + ".useMockInstance";
-  
-  /**
-   * Key for storing the set of ranges over which to query
-   * 
-   * @see #setRanges(Job, Collection)
-   * @see #getRanges(JobContext)
-   */
-  private static final String RANGES = PREFIX + ".ranges";
-  
-  /**
-   * Key for storing whether to auto adjust ranges by merging overlapping ranges and splitting them on tablet boundaries
-   * 
-   * @see #setAutoAdjustRanges(Job, boolean)
-   * @see #getAutoAdjustRanges(JobContext)
-   */
-  private static final String AUTO_ADJUST_RANGES = PREFIX + ".ranges.autoAdjust";
-  
-  /**
-   * Key for storing the set of columns to query
-   * 
-   * @see #fetchColumns(Job, Collection)
-   * @see #getFetchedColumns(JobContext)
-   */
-  private static final String COLUMNS = PREFIX + ".columns";
-  
-  /**
-   * Key for storing the desired logging level
-   * 
-   * @see #setLogLevel(Job, Level)
-   * @see #getLogLevel(JobContext)
-   */
-  private static final String LOGLEVEL = PREFIX + ".loglevel";
-  
-  /**
-   * Key for storing whether the scanner to use is {@link IsolatedScanner}
-   * 
-   * @see #setScanIsolation(Job, boolean)
-   * @see #isIsolated(JobContext)
-   */
-  private static final String ISOLATED = PREFIX + ".isolated";
-  
-  /**
-   * Key for storing whether iterators are local
-   * 
-   * @see #setLocalIterators(Job, boolean)
-   * @see #usesLocalIterators(JobContext)
    */
-  private static final String LOCAL_ITERATORS = PREFIX + ".localiters";
-  
-  /**
-   * Key for storing the scan-time iterators to use
-   * 
-   * @see #addIterator(Job, IteratorSetting)
-   * @see #getIterators(JobContext)
-   */
-  private static final String ITERATORS = PREFIX + ".iterators";
-  
-  /**
-   * Constant for separating serialized {@link IteratorSetting} configuration in the job
-   * 
-   * @see #addIterator(Job, IteratorSetting)
-   * @see #getIterators(JobContext)
-   */
-  private static final String ITERATORS_DELIM = ",";
-  
-  /**
-   * Key for storing whether to read a table's files while it is offline
-   * 
-   * @see #setOfflineTableScan(Job, boolean)
-   * @see #isOfflineScan(JobContext)
-   */
-  private static final String READ_OFFLINE = PREFIX + ".read.offline";
+  protected static Instance getInstance(JobContext context) {
+    return InputConfigurator.getInstance(CLASS, context.getConfiguration());
+  }
   
   /**
-   * Controls the use of the {@link IsolatedScanner} in this job.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
+   * Sets the log level for this job.
    * 
    * @param job
    *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
+   * @param level
+   *          the logging level
    * @since 1.5.0
    */
-  public static void setScanIsolation(Job job, boolean enableFeature) {
-    job.getConfiguration().setBoolean(ISOLATED, enableFeature);
+  public static void setLogLevel(Job job, Level level) {
+    InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
   
   /**
-   * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
-   * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
+   * Gets the log level from this configuration.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the log level
    * @since 1.5.0
+   * @see #setLogLevel(Job, Level)
    */
-  public static void setLocalIterators(Job job, boolean enableFeature) {
-    job.getConfiguration().setBoolean(LOCAL_ITERATORS, enableFeature);
+  protected static Level getLogLevel(JobContext context) {
+    return InputConfigurator.getLogLevel(CLASS, context.getConfiguration());
   }
   
   /**
-   * Sets the minimum information needed to query Accumulo in this job.
+   * Sets the name of the input table, over which this job will scan.
    * 
    * @param job
    *          the Hadoop job instance to be configured
-   * @param user
-   *          a valid Accumulo user name
-   * @param passwd
-   *          the user's password
-   * @param table
-   *          the table from which to read
-   * @param auths
-   *          the authorizations used to restrict data read
+   * @param tableName
+   *          the table to use when the tablename is null in the write call
    * @since 1.5.0
    */
-  public static void setInputInfo(Job job, String user, byte[] passwd, String table, Authorizations auths) {
-    if (job.getConfiguration().getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Input info can only be set once per job");
-    job.getConfiguration().setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(user, passwd, table);
-    job.getConfiguration().set(USERNAME, user);
-    job.getConfiguration().set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    job.getConfiguration().set(TABLE_NAME, table);
-    if (auths != null && !auths.isEmpty())
-      job.getConfiguration().set(AUTHORIZATIONS, auths.serialize());
+  public static void setInputTableName(Job job, String tableName) {
+    InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
   }
   
   /**
-   * Configures a {@link ZooKeeperInstance} for this job.
+   * Gets the table name from the configuration.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param instanceName
-   *          the Accumulo instance name
-   * @param zooKeepers
-   *          a comma-separated list of zookeeper servers
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the table name
    * @since 1.5.0
+   * @see #setInputTableName(Job, String)
    */
-  public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
-    if (job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per job");
-    job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    job.getConfiguration().set(INSTANCE_NAME, instanceName);
-    job.getConfiguration().set(ZOOKEEPERS, zooKeepers);
+  protected static String getInputTableName(JobContext context) {
+    return InputConfigurator.getInputTableName(CLASS, context.getConfiguration());
   }
   
   /**
-   * Configures a {@link MockInstance} for this job.
+   * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
    * @param job
    *          the Hadoop job instance to be configured
-   * @param instanceName
-   *          the Accumulo instance name
+   * @param auths
+   *          the user's authorizations
    * @since 1.5.0
    */
-  public static void setMockInstance(Job job, String instanceName) {
-    job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    job.getConfiguration().setBoolean(MOCK, true);
-    job.getConfiguration().set(INSTANCE_NAME, instanceName);
+  public static void setScanAuthorizations(Job job, Authorizations auths) {
+    InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
   }
   
   /**
-   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   * Gets the authorizations to set for the scans from the configuration.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param ranges
-   *          the ranges that will be mapped over
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the Accumulo scan authorizations
    * @since 1.5.0
+   * @see #setScanAuthorizations(Job, Authorizations)
    */
-  public static void setRanges(Job job, Collection<Range> ranges) {
-    ArgumentChecker.notNull(ranges);
-    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
-    try {
-      for (Range r : ranges) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        r.write(new DataOutputStream(baos));
-        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
-      }
-    } catch (IOException ex) {
-      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
-    }
-    job.getConfiguration().setStrings(RANGES, rangeStrings.toArray(new String[0]));
+  protected static Authorizations getScanAuthorizations(JobContext context) {
+    return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration());
   }
   
   /**
-   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
-   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
-   * 
-   * <p>
-   * By default, this feature is <b>enabled</b>.
+   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
    * 
    * @param job
    *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
-   * @see #setRanges(Job, Collection)
+   * @param ranges
+   *          the ranges that will be mapped over
    * @since 1.5.0
    */
-  public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
-    job.getConfiguration().setBoolean(AUTO_ADJUST_RANGES, enableFeature);
+  public static void setRanges(Job job, Collection<Range> ranges) {
+    InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
   }
   
   /**
-   * <p>
-   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
-   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
-   * fail.
-   * 
-   * <p>
-   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
-   * 
-   * <p>
-   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
-   * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
-   * 
-   * <p>
-   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
-   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
-   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
-   * 
-   * <p>
-   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
-   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
+   * Gets the ranges to scan over from a job.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the ranges
+   * @throws IOException
+   *           if the ranges have been encoded improperly
    * @since 1.5.0
+   * @see #setRanges(Job, Collection)
    */
-  public static void setOfflineTableScan(Job job, boolean enableFeature) {
-    job.getConfiguration().setBoolean(READ_OFFLINE, enableFeature);
+  protected static List<Range> getRanges(JobContext context) throws IOException {
+    return InputConfigurator.getRanges(CLASS, context.getConfiguration());
   }
   
   /**
@@ -441,33 +307,20 @@ public abstract class InputFormatBase<K,
    * @since 1.5.0
    */
   public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
-    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
-    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-      if (column.getFirst() == null)
-        throw new IllegalArgumentException("Column family can not be null");
-      
-      String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())));
-      if (column.getSecond() != null)
-        col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())));
-      columnStrings.add(col);
-    }
-    job.getConfiguration().setStrings(COLUMNS, columnStrings.toArray(new String[0]));
+    InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
   }
   
   /**
-   * Sets the log level for this job.
+   * Gets the columns to be mapped over from this job.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param level
-   *          the logging level
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return a set of columns
    * @since 1.5.0
+   * @see #fetchColumns(Job, Collection)
    */
-  public static void setLogLevel(Job job, Level level) {
-    ArgumentChecker.notNull(level);
-    log.setLevel(level);
-    job.getConfiguration().setInt(LOGLEVEL, level.toInt());
+  protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
+    return InputConfigurator.getFetchedColumns(CLASS, context.getConfiguration());
   }
   
   /**
@@ -480,213 +333,172 @@ public abstract class InputFormatBase<K,
    * @since 1.5.0
    */
   public static void addIterator(Job job, IteratorSetting cfg) {
-    // First check to see if anything has been set already
-    String iterators = job.getConfiguration().get(ITERATORS);
-    
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    String newIter;
-    try {
-      cfg.write(new DataOutputStream(baos));
-      newIter = new String(Base64.encodeBase64(baos.toByteArray()));
-      baos.close();
-    } catch (IOException e) {
-      throw new IllegalArgumentException("unable to serialize IteratorSetting");
-    }
-    
-    // No iterators specified yet, create a new string
-    if (iterators == null || iterators.isEmpty()) {
-      iterators = newIter;
-    } else {
-      // append the next iterator & reset
-      iterators = iterators.concat(ITERATORS_DELIM + newIter);
-    }
-    // Store the iterators w/ the job
-    job.getConfiguration().set(ITERATORS, iterators);
+    InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
   }
   
   /**
-   * Determines whether a configuration has isolation enabled.
+   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return true if the feature is enabled, false otherwise
+   * @return a list of iterators
    * @since 1.5.0
-   * @see #setScanIsolation(Job, boolean)
+   * @see #addIterator(Job, IteratorSetting)
    */
-  protected static boolean isIsolated(JobContext context) {
-    return context.getConfiguration().getBoolean(ISOLATED, false);
+  protected static List<IteratorSetting> getIterators(JobContext context) {
+    return InputConfigurator.getIterators(CLASS, context.getConfiguration());
   }
   
   /**
-   * Determines whether a configuration uses local iterators.
+   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
+   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return true if the feature is enabled, false otherwise
-   * @since 1.5.0
-   * @see #setLocalIterators(Job, boolean)
-   */
-  protected static boolean usesLocalIterators(JobContext context) {
-    return context.getConfiguration().getBoolean(LOCAL_ITERATORS, false);
-  }
-  
-  /**
-   * Gets the user name from the configuration.
+   * <p>
+   * By default, this feature is <b>enabled</b>.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the user name
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @see #setRanges(Job, Collection)
    * @since 1.5.0
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
    */
-  protected static String getUsername(JobContext context) {
-    return context.getConfiguration().get(USERNAME);
+  public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
+    InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Determines whether a configuration has auto-adjust ranges enabled.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the decoded user password
+   * @return false if the feature is disabled, true otherwise
    * @since 1.5.0
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
+   * @see #setAutoAdjustRanges(Job, boolean)
    */
-  protected static byte[] getPassword(JobContext context) {
-    return Base64.decodeBase64(context.getConfiguration().get(PASSWORD, "").getBytes());
+  protected static boolean getAutoAdjustRanges(JobContext context) {
+    return InputConfigurator.getAutoAdjustRanges(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the table name from the configuration.
+   * Controls the use of the {@link IsolatedScanner} in this job.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the table name
-   * @since 1.5.0
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
-   */
-  protected static String getTablename(JobContext context) {
-    return context.getConfiguration().get(TABLE_NAME);
-  }
-  
-  /**
-   * Gets the authorizations to set for the scans from the configuration.
+   * <p>
+   * By default, this feature is <b>disabled</b>.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the Accumulo scan authorizations
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
-   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
    */
-  protected static Authorizations getAuthorizations(JobContext context) {
-    String authString = context.getConfiguration().get(AUTHORIZATIONS);
-    return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.getBytes());
+  public static void setScanIsolation(Job job, boolean enableFeature) {
+    InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Initializes an Accumulo {@link Instance} based on the configuration.
+   * Determines whether a configuration has isolation enabled.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return an Accumulo instance
+   * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
+   * @see #setScanIsolation(Job, boolean)
    */
-  protected static Instance getInstance(JobContext context) {
-    if (context.getConfiguration().getBoolean(MOCK, false))
-      return new MockInstance(context.getConfiguration().get(INSTANCE_NAME));
-    return new ZooKeeperInstance(context.getConfiguration().get(INSTANCE_NAME), context.getConfiguration().get(ZOOKEEPERS));
+  protected static boolean isIsolated(JobContext context) {
+    return InputConfigurator.isIsolated(CLASS, context.getConfiguration());
   }
   
   /**
-   * Initializes an Accumulo {@link TabletLocator} based on the configuration.
+   * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
+   * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return an Accumulo tablet locator
-   * @throws TableNotFoundException
-   *           if the table name set on the configuration doesn't exist
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
-    if (context.getConfiguration().getBoolean(MOCK, false))
-      return new MockTabletLocator();
-    Instance instance = getInstance(context);
-    String username = getUsername(context);
-    byte[] password = getPassword(context);
-    String tableName = getTablename(context);
-    return TabletLocator.getInstance(instance, new AuthInfo(username, ByteBuffer.wrap(password), instance.getInstanceID()),
-        new Text(Tables.getTableId(instance, tableName)));
+  public static void setLocalIterators(Job job, boolean enableFeature) {
+    InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Gets the ranges to scan over from a job.
+   * Determines whether a configuration uses local iterators.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the ranges
-   * @throws IOException
-   *           if the ranges have been encoded improperly
+   * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
-   * @see #setRanges(Job, Collection)
+   * @see #setLocalIterators(Job, boolean)
    */
-  protected static List<Range> getRanges(JobContext context) throws IOException {
-    ArrayList<Range> ranges = new ArrayList<Range>();
-    for (String rangeString : context.getConfiguration().getStringCollection(RANGES)) {
-      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
-      Range range = new Range();
-      range.readFields(new DataInputStream(bais));
-      ranges.add(range);
-    }
-    return ranges;
+  protected static boolean usesLocalIterators(JobContext context) {
+    return InputConfigurator.usesLocalIterators(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the columns to be mapped over from this job.
+   * <p>
+   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
+   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
+   * fail.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return a set of columns
+   * <p>
+   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
+   * 
+   * <p>
+   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
+   * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
+   * 
+   * <p>
+   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
+   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
+   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
+   * 
+   * <p>
+   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
+   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
-   * @see #fetchColumns(Job, Collection)
    */
-  protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
-    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
-    for (String col : context.getConfiguration().getStringCollection(COLUMNS)) {
-      int idx = col.indexOf(":");
-      Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes()) : Base64.decodeBase64(col.substring(0, idx).getBytes()));
-      Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
-      columns.add(new Pair<Text,Text>(cf, cq));
-    }
-    return columns;
+  public static void setOfflineTableScan(Job job, boolean enableFeature) {
+    InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Determines whether a configuration has auto-adjust ranges enabled.
+   * Determines whether a configuration has the offline table scan feature enabled.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return false if the feature is disabled, true otherwise
+   * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
-   * @see #setAutoAdjustRanges(Job, boolean)
+   * @see #setOfflineTableScan(Job, boolean)
    */
-  protected static boolean getAutoAdjustRanges(JobContext context) {
-    return context.getConfiguration().getBoolean(AUTO_ADJUST_RANGES, true);
+  protected static boolean isOfflineScan(JobContext context) {
+    return InputConfigurator.isOfflineScan(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the log level from this configuration.
+   * Initializes an Accumulo {@link TabletLocator} based on the configuration.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the log level
+   * @return an Accumulo tablet locator
+   * @throws TableNotFoundException
+   *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
-   * @see #setLogLevel(Job, Level)
    */
-  protected static Level getLogLevel(JobContext context) {
-    return Level.toLevel(context.getConfiguration().getInt(LOGLEVEL, Level.INFO.toInt()));
+  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, context.getConfiguration());
   }
   
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -700,77 +512,7 @@ public abstract class InputFormatBase<K,
    * @since 1.5.0
    */
   protected static void validateOptions(JobContext context) throws IOException {
-    if (!context.getConfiguration().getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
-      throw new IOException("Input info has not been set.");
-    if (!context.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IOException("Instance info has not been set.");
-    // validate that we can connect as configured
-    try {
-      Connector c = getInstance(context).getConnector(getUsername(context), getPassword(context));
-      if (!c.securityOperations().authenticateUser(getUsername(context), getPassword(context)))
-        throw new IOException("Unable to authenticate user");
-      if (!c.securityOperations().hasTablePermission(getUsername(context), getTablename(context), TablePermission.READ))
-        throw new IOException("Unable to access table");
-      
-      if (!usesLocalIterators(context)) {
-        // validate that any scan-time iterators can be loaded by the the tablet servers
-        for (IteratorSetting iter : getIterators(context)) {
-          if (!c.instanceOperations().testClassLoad(iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
-            throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
-        }
-      }
-      
-    } catch (AccumuloException e) {
-      throw new IOException(e);
-    } catch (AccumuloSecurityException e) {
-      throw new IOException(e);
-    }
-  }
-  
-  /**
-   * Determines whether a configuration has the offline table scan feature enabled.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return true if the feature is enabled, false otherwise
-   * @since 1.5.0
-   * @see #setOfflineTableScan(Job, boolean)
-   */
-  protected static boolean isOfflineScan(JobContext context) {
-    return context.getConfiguration().getBoolean(READ_OFFLINE, false);
-  }
-  
-  /**
-   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return a list of iterators
-   * @since 1.5.0
-   * @see #addIterator(Job, IteratorSetting)
-   */
-  protected static List<IteratorSetting> getIterators(JobContext context) {
-    
-    String iterators = context.getConfiguration().get(ITERATORS);
-    
-    // If no iterators are present, return an empty list
-    if (iterators == null || iterators.isEmpty())
-      return new ArrayList<IteratorSetting>();
-    
-    // Compose the set of iterators encoded in the job configuration
-    StringTokenizer tokens = new StringTokenizer(context.getConfiguration().get(ITERATORS), ITERATORS_DELIM);
-    List<IteratorSetting> list = new ArrayList<IteratorSetting>();
-    try {
-      while (tokens.hasMoreTokens()) {
-        String itstring = tokens.nextToken();
-        ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
-        list.add(new IteratorSetting(new DataInputStream(bais)));
-        bais.close();
-      }
-    } catch (IOException e) {
-      throw new IllegalArgumentException("couldn't decode iterator settings");
-    }
-    return list;
+    InputConfigurator.validateOptions(CLASS, context.getConfiguration());
   }
   
   /**
@@ -816,18 +558,18 @@ public abstract class InputFormatBase<K,
       Instance instance = getInstance(attempt);
       String user = getUsername(attempt);
       byte[] password = getPassword(attempt);
-      Authorizations authorizations = getAuthorizations(attempt);
+      Authorizations authorizations = getScanAuthorizations(attempt);
       
       try {
         log.debug("Creating connector with user: " + user);
         Connector conn = instance.getConnector(user, password);
-        log.debug("Creating scanner for table: " + getTablename(attempt));
+        log.debug("Creating scanner for table: " + getInputTableName(attempt));
         log.debug("Authorizations are: " + authorizations);
         if (isOfflineScan(attempt)) {
           scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password), instance.getInstanceID()), Tables.getTableId(instance,
-              getTablename(attempt)), authorizations);
+              getInputTableName(attempt)), authorizations);
         } else {
-          scanner = conn.createScanner(getTablename(attempt), authorizations);
+          scanner = conn.createScanner(getInputTableName(attempt), authorizations);
         }
         if (isIsolated(attempt)) {
           log.info("Creating isolated scanner");
@@ -887,13 +629,13 @@ public abstract class InputFormatBase<K,
     }
   }
   
-  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(Configuration conf, String tableName, List<Range> ranges) throws TableNotFoundException,
+  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
       AccumuloException, AccumuloSecurityException {
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     
-    Instance instance = getInstance(conf);
-    Connector conn = instance.getConnector(getUsername(conf), getPassword(conf));
+    Instance instance = getInstance(context);
+    Connector conn = instance.getConnector(getUsername(context), getPassword(context));
     String tableId = Tables.getTableId(instance, tableName);
     
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
@@ -995,7 +737,7 @@ public abstract class InputFormatBase<K,
     log.setLevel(getLogLevel(context));
     validateOptions(context);
     
-    String tableName = getTablename(context);
+    String tableName = getInputTableName(context);
     boolean autoAdjust = getAutoAdjustRanges(context);
     List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
     
@@ -1009,11 +751,11 @@ public abstract class InputFormatBase<K,
     TabletLocator tl;
     try {
       if (isOfflineScan(context)) {
-        binnedRanges = binOfflineTable(context.getConfiguration(), tableName, ranges);
+        binnedRanges = binOfflineTable(context, tableName, ranges);
         while (binnedRanges == null) {
           // Some tablets were still online, try again
           UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(context.getConfiguration(), tableName, ranges);
+          binnedRanges = binOfflineTable(context, tableName, ranges);
         }
       } else {
         Instance instance = getInstance(context);
@@ -1205,19 +947,11 @@ public abstract class InputFormatBase<K,
   // ----------------------------------------------------------------------------------------------------
   
   /**
-   * @deprecated since 1.5.0;
-   * @see #addIterator(Configuration, IteratorSetting)
-   * @see #getIteratorOptions(Configuration)
-   */
-  @Deprecated
-  private static final String ITERATORS_OPTIONS = PREFIX + ".iterators.options";
-  
-  /**
    * @deprecated since 1.5.0; Use {@link #setScanIsolation(Job, boolean)} instead.
    */
   @Deprecated
   public static void setIsolated(Configuration conf, boolean enable) {
-    conf.setBoolean(ISOLATED, enable);
+    InputConfigurator.setScanIsolation(CLASS, conf, enable);
   }
   
   /**
@@ -1225,24 +959,18 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void setLocalIterators(Configuration conf, boolean enable) {
-    conf.setBoolean(LOCAL_ITERATORS, enable);
+    InputConfigurator.setLocalIterators(CLASS, conf, enable);
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #setInputInfo(Job, String, byte[], String, Authorizations)} instead.
+   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link #setInputTableName(Job, String)}, and
+   *             {@link #setScanAuthorizations(Job, Authorizations)} instead.
    */
   @Deprecated
   public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
-    if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Input info can only be set once per job");
-    conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(user, passwd, table);
-    conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    conf.set(TABLE_NAME, table);
-    if (auths != null && !auths.isEmpty())
-      conf.set(AUTHORIZATIONS, auths.serialize());
+    InputConfigurator.setConnectorInfo(CLASS, conf, user, passwd);
+    InputConfigurator.setInputTableName(CLASS, conf, table);
+    InputConfigurator.setScanAuthorizations(CLASS, conf, auths);
   }
   
   /**
@@ -1250,13 +978,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
+    InputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
   }
   
   /**
@@ -1264,9 +986,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void setMockInstance(Configuration conf, String instanceName) {
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    conf.setBoolean(MOCK, true);
-    conf.set(INSTANCE_NAME, instanceName);
+    InputConfigurator.setMockInstance(CLASS, conf, instanceName);
   }
   
   /**
@@ -1274,18 +994,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void setRanges(Configuration conf, Collection<Range> ranges) {
-    ArgumentChecker.notNull(ranges);
-    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
-    try {
-      for (Range r : ranges) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        r.write(new DataOutputStream(baos));
-        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
-      }
-    } catch (IOException ex) {
-      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
-    }
-    conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
+    InputConfigurator.setRanges(CLASS, conf, ranges);
   }
   
   /**
@@ -1293,7 +1002,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void disableAutoAdjustRanges(Configuration conf) {
-    conf.setBoolean(AUTO_ADJUST_RANGES, false);
+    InputConfigurator.setAutoAdjustRanges(CLASS, conf, false);
   }
   
   /**
@@ -1301,14 +1010,9 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
-    if (maxVersions < 1)
-      throw new IOException("Invalid maxVersions: " + maxVersions + ".  Must be >= 1");
-    // Check to make sure its a legit value
-    if (maxVersions >= 1) {
-      IteratorSetting vers = new IteratorSetting(0, "vers", VersioningIterator.class);
-      VersioningIterator.setMaxVersions(vers, maxVersions);
-      addIterator(conf, vers);
-    }
+    IteratorSetting vers = new IteratorSetting(0, "vers", VersioningIterator.class);
+    VersioningIterator.setMaxVersions(vers, maxVersions);
+    InputConfigurator.addIterator(CLASS, conf, vers);
   }
   
   /**
@@ -1316,7 +1020,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void setScanOffline(Configuration conf, boolean scanOff) {
-    conf.setBoolean(READ_OFFLINE, scanOff);
+    InputConfigurator.setOfflineTableScan(CLASS, conf, scanOff);
   }
   
   /**
@@ -1324,18 +1028,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
-    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
-    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-      if (column.getFirst() == null)
-        throw new IllegalArgumentException("Column family can not be null");
-      
-      String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())));
-      if (column.getSecond() != null)
-        col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())));
-      columnStrings.add(col);
-    }
-    conf.setStrings(COLUMNS, columnStrings.toArray(new String[0]));
+    InputConfigurator.fetchColumns(CLASS, conf, columnFamilyColumnQualifierPairs);
   }
   
   /**
@@ -1343,9 +1036,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void setLogLevel(Configuration conf, Level level) {
-    ArgumentChecker.notNull(level);
-    log.setLevel(level);
-    conf.setInt(LOGLEVEL, level.toInt());
+    InputConfigurator.setLogLevel(CLASS, conf, level);
   }
   
   /**
@@ -1353,35 +1044,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   public static void addIterator(Configuration conf, IteratorSetting cfg) {
-    // First check to see if anything has been set already
-    String iterators = conf.get(ITERATORS);
-    
-    // No iterators specified yet, create a new string
-    if (iterators == null || iterators.isEmpty()) {
-      iterators = new AccumuloIterator(cfg.getPriority(), cfg.getIteratorClass(), cfg.getName()).toString();
-    } else {
-      // append the next iterator & reset
-      iterators = iterators.concat(ITERATORS_DELIM + new AccumuloIterator(cfg.getPriority(), cfg.getIteratorClass(), cfg.getName()).toString());
-    }
-    // Store the iterators w/ the job
-    conf.set(ITERATORS, iterators);
-    for (Entry<String,String> entry : cfg.getOptions().entrySet()) {
-      if (entry.getValue() == null)
-        continue;
-      
-      String iteratorOptions = conf.get(ITERATORS_OPTIONS);
-      
-      // No options specified yet, create a new string
-      if (iteratorOptions == null || iteratorOptions.isEmpty()) {
-        iteratorOptions = new AccumuloIteratorOption(cfg.getName(), entry.getKey(), entry.getValue()).toString();
-      } else {
-        // append the next option & reset
-        iteratorOptions = iteratorOptions.concat(ITERATORS_DELIM + new AccumuloIteratorOption(cfg.getName(), entry.getKey(), entry.getValue()));
-      }
-      
-      // Store the options w/ the job
-      conf.set(ITERATORS_OPTIONS, iteratorOptions);
-    }
+    InputConfigurator.addIterator(CLASS, conf, cfg);
   }
   
   /**
@@ -1389,7 +1052,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static boolean isIsolated(Configuration conf) {
-    return conf.getBoolean(ISOLATED, false);
+    return InputConfigurator.isIsolated(CLASS, conf);
   }
   
   /**
@@ -1397,7 +1060,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static boolean usesLocalIterators(Configuration conf) {
-    return conf.getBoolean(LOCAL_ITERATORS, false);
+    return InputConfigurator.usesLocalIterators(CLASS, conf);
   }
   
   /**
@@ -1405,7 +1068,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static String getUsername(Configuration conf) {
-    return conf.get(USERNAME);
+    return InputConfigurator.getUsername(CLASS, conf);
   }
   
   /**
@@ -1413,24 +1076,23 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+    return InputConfigurator.getPassword(CLASS, conf);
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #getTablename(JobContext)} instead.
+   * @deprecated since 1.5.0; Use {@link #getInputTableName(JobContext)} instead.
    */
   @Deprecated
   protected static String getTablename(Configuration conf) {
-    return conf.get(TABLE_NAME);
+    return InputConfigurator.getInputTableName(CLASS, conf);
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #getAuthorizations(JobContext)} instead.
+   * @deprecated since 1.5.0; Use {@link #getScanAuthorizations(JobContext)} instead.
    */
   @Deprecated
   protected static Authorizations getAuthorizations(Configuration conf) {
-    String authString = conf.get(AUTHORIZATIONS);
-    return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.split(","));
+    return InputConfigurator.getScanAuthorizations(CLASS, conf);
   }
   
   /**
@@ -1438,9 +1100,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static Instance getInstance(Configuration conf) {
-    if (conf.getBoolean(MOCK, false))
-      return new MockInstance(conf.get(INSTANCE_NAME));
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
+    return InputConfigurator.getInstance(CLASS, conf);
   }
   
   /**
@@ -1448,14 +1108,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
-    if (conf.getBoolean(MOCK, false))
-      return new MockTabletLocator();
-    Instance instance = getInstance(conf);
-    String username = getUsername(conf);
-    byte[] password = getPassword(conf);
-    String tableName = getTablename(conf);
-    return TabletLocator.getInstance(instance, new AuthInfo(username, ByteBuffer.wrap(password), instance.getInstanceID()),
-        new Text(Tables.getTableId(instance, tableName)));
+    return InputConfigurator.getTabletLocator(CLASS, conf);
   }
   
   /**
@@ -1463,14 +1116,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static List<Range> getRanges(Configuration conf) throws IOException {
-    ArrayList<Range> ranges = new ArrayList<Range>();
-    for (String rangeString : conf.getStringCollection(RANGES)) {
-      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
-      Range range = new Range();
-      range.readFields(new DataInputStream(bais));
-      ranges.add(range);
-    }
-    return ranges;
+    return InputConfigurator.getRanges(CLASS, conf);
   }
   
   /**
@@ -1478,14 +1124,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
-    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
-    for (String col : conf.getStringCollection(COLUMNS)) {
-      int idx = col.indexOf(":");
-      Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes()) : Base64.decodeBase64(col.substring(0, idx).getBytes()));
-      Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
-      columns.add(new Pair<Text,Text>(cf, cq));
-    }
-    return columns;
+    return InputConfigurator.getFetchedColumns(CLASS, conf);
   }
   
   /**
@@ -1493,7 +1132,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static boolean getAutoAdjustRanges(Configuration conf) {
-    return conf.getBoolean(AUTO_ADJUST_RANGES, true);
+    return InputConfigurator.getAutoAdjustRanges(CLASS, conf);
   }
   
   /**
@@ -1501,7 +1140,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static Level getLogLevel(Configuration conf) {
-    return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
+    return InputConfigurator.getLogLevel(CLASS, conf);
   }
   
   /**
@@ -1509,31 +1148,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static void validateOptions(Configuration conf) throws IOException {
-    if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
-      throw new IOException("Input info has not been set.");
-    if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IOException("Instance info has not been set.");
-    // validate that we can connect as configured
-    try {
-      Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
-      if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf)))
-        throw new IOException("Unable to authenticate user");
-      if (!c.securityOperations().hasTablePermission(getUsername(conf), getTablename(conf), TablePermission.READ))
-        throw new IOException("Unable to access table");
-      
-      if (!usesLocalIterators(conf)) {
-        // validate that any scan-time iterators can be loaded by the the tablet servers
-        for (AccumuloIterator iter : getIterators(conf)) {
-          if (!c.instanceOperations().testClassLoad(iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
-            throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
-        }
-      }
-      
-    } catch (AccumuloException e) {
-      throw new IOException(e);
-    } catch (AccumuloSecurityException e) {
-      throw new IOException(e);
-    }
+    InputConfigurator.validateOptions(CLASS, conf);
   }
   
   /**
@@ -1541,6 +1156,17 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static int getMaxVersions(Configuration conf) {
+    // This is so convoluted, because the only reason to get the number of maxVersions is to construct the same type of IteratorSetting object we have to
+    // deconstruct to get at this option in the first place, but to preserve correct behavior, this appears necessary.
+    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
+    for (IteratorSetting setting : iteratorSettings) {
+      if ("vers".equals(setting.getName()) && 0 == setting.getPriority() && VersioningIterator.class.getName().equals(setting.getIteratorClass())) {
+        if (setting.getOptions().containsKey("maxVersions"))
+          return Integer.parseInt(setting.getOptions().get("maxVersions"));
+        else
+          return -1;
+      }
+    }
     return -1;
   }
   
@@ -1549,7 +1175,7 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static boolean isOfflineScan(Configuration conf) {
-    return conf.getBoolean(READ_OFFLINE, false);
+    return InputConfigurator.isOfflineScan(CLASS, conf);
   }
   
   /**
@@ -1557,21 +1183,14 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static List<AccumuloIterator> getIterators(Configuration conf) {
-    
-    String iterators = conf.get(ITERATORS);
-    
-    // If no iterators are present, return an empty list
-    if (iterators == null || iterators.isEmpty())
-      return new ArrayList<AccumuloIterator>();
-    
-    // Compose the set of iterators encoded in the job configuration
-    StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS), ITERATORS_DELIM);
-    List<AccumuloIterator> list = new ArrayList<AccumuloIterator>();
-    while (tokens.hasMoreTokens()) {
-      String itstring = tokens.nextToken();
-      list.add(new AccumuloIterator(itstring));
+    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
+    List<AccumuloIterator> deprecatedIterators = new ArrayList<AccumuloIterator>(iteratorSettings.size());
+    for (IteratorSetting setting : iteratorSettings) {
+      AccumuloIterator deprecatedIter = new AccumuloIterator(new String(setting.getPriority() + AccumuloIterator.FIELD_SEP + setting.getIteratorClass()
+          + AccumuloIterator.FIELD_SEP + setting.getName()));
+      deprecatedIterators.add(deprecatedIter);
     }
-    return list;
+    return deprecatedIterators;
   }
   
   /**
@@ -1579,20 +1198,21 @@ public abstract class InputFormatBase<K,
    */
   @Deprecated
   protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
-    String iteratorOptions = conf.get(ITERATORS_OPTIONS);
-    
-    // If no options are present, return an empty list
-    if (iteratorOptions == null || iteratorOptions.isEmpty())
-      return new ArrayList<AccumuloIteratorOption>();
-    
-    // Compose the set of options encoded in the job configuration
-    StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS_OPTIONS), ITERATORS_DELIM);
-    List<AccumuloIteratorOption> list = new ArrayList<AccumuloIteratorOption>();
-    while (tokens.hasMoreTokens()) {
-      String optionString = tokens.nextToken();
-      list.add(new AccumuloIteratorOption(optionString));
+    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
+    List<AccumuloIteratorOption> deprecatedIteratorOptions = new ArrayList<AccumuloIteratorOption>(iteratorSettings.size());
+    for (IteratorSetting setting : iteratorSettings) {
+      for (Entry<String,String> opt : setting.getOptions().entrySet()) {
+        String deprecatedOption;
+        try {
+          deprecatedOption = new String(setting.getName() + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getKey(), "UTF-8")
+              + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getValue(), "UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+          throw new RuntimeException(e);
+        }
+        deprecatedIteratorOptions.add(new AccumuloIteratorOption(deprecatedOption));
+      }
     }
-    return list;
+    return deprecatedIteratorOptions;
   }
   
   /**

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java?rev=1437073&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java Tue Jan 22 18:07:17 2013
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.util;
+
+import java.nio.charset.Charset;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * @since 1.5.0
+ */
+public class ConfiguratorBase {
+  
+  /**
+   * Configuration keys for {@link Instance#getConnector(String, byte[])}.
+   * 
+   * @since 1.5.0
+   */
+  public static enum ConnectorInfo {
+    IS_CONFIGURED, USER_NAME, PASSWORD
+  }
+  
+  /**
+   * Configuration keys for {@link Instance}, {@link ZooKeeperInstance}, and {@link MockInstance}.
+   * 
+   * @since 1.5.0
+   */
+  protected static enum InstanceOpts {
+    TYPE, NAME, ZOO_KEEPERS;
+  }
+  
+  /**
+   * Configuration keys for general configuration options.
+   * 
+   * @since 1.5.0
+   */
+  protected static enum GeneralOpts {
+    LOG_LEVEL
+  }
+  
+  /**
+   * Provides a configuration key for a given feature enum, prefixed by the implementingClass
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param e
+   *          the enum used to provide the unique part of the configuration key
+   * @return the configuration key
+   * @since 1.5.0
+   */
+  protected static String enumToConfKey(Class<?> implementingClass, Enum<?> e) {
+    return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "." + StringUtils.camelize(e.name().toLowerCase());
+  }
+  
+  /**
+   * Sets the connector information needed to communicate with Accumulo in this job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param user
+   *          a valid Accumulo user name
+   * @param passwd
+   *          the user's password
+   * @since 1.5.0
+   */
+  public static void setConnectorInfo(Class<?> implementingClass, Configuration conf, String user, byte[] passwd) {
+    if (isConnectorInfoSet(implementingClass, conf))
+      throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job");
+    
+    ArgumentChecker.notNull(user, passwd);
+    conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
+    conf.set(enumToConfKey(implementingClass, ConnectorInfo.USER_NAME), user);
+    conf.set(enumToConfKey(implementingClass, ConnectorInfo.PASSWORD), new String(Base64.encodeBase64(passwd), Charset.forName("UTF-8")));
+  }
+  
+  /**
+   * Determines if the connector info has already been set for this instance.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return true if the connector info has already been set, false otherwise
+   * @since 1.5.0
+   */
+  public static Boolean isConnectorInfoSet(Class<?> implementingClass, Configuration conf) {
+    return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), false);
+  }
+  
+  /**
+   * Gets the user name from the configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the user name
+   * @since 1.5.0
+   * @see #setConnectorInfo(Class, Configuration, String, byte[])
+   */
+  public static String getUsername(Class<?> implementingClass, Configuration conf) {
+    return conf.get(enumToConfKey(implementingClass, ConnectorInfo.USER_NAME));
+  }
+  
+  /**
+   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
+   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the decoded user password
+   * @since 1.5.0
+   * @see #setConnectorInfo(Class, Configuration, String, byte[])
+   */
+  public static byte[] getPassword(Class<?> implementingClass, Configuration conf) {
+    return Base64.decodeBase64(conf.get(enumToConfKey(implementingClass, ConnectorInfo.PASSWORD), "").getBytes(Charset.forName("UTF-8")));
+  }
+  
+  /**
+   * Configures a {@link ZooKeeperInstance} for this job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param instanceName
+   *          the Accumulo instance name
+   * @param zooKeepers
+   *          a comma-separated list of zookeeper servers
+   * @since 1.5.0
+   */
+  public static void setZooKeeperInstance(Class<?> implementingClass, Configuration conf, String instanceName, String zooKeepers) {
+    String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
+    if (!conf.get(key, "").isEmpty())
+      throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key));
+    conf.set(key, "ZooKeeperInstance");
+    
+    ArgumentChecker.notNull(instanceName, zooKeepers);
+    conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName);
+    conf.set(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS), zooKeepers);
+  }
+  
+  /**
+   * Configures a {@link MockInstance} for this job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param instanceName
+   *          the Accumulo instance name
+   * @since 1.5.0
+   */
+  public static void setMockInstance(Class<?> implementingClass, Configuration conf, String instanceName) {
+    String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
+    if (!conf.get(key, "").isEmpty())
+      throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key));
+    conf.set(key, "MockInstance");
+    
+    ArgumentChecker.notNull(instanceName);
+    conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName);
+  }
+  
+  /**
+   * Initializes an Accumulo {@link Instance} based on the configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return an Accumulo instance
+   * @since 1.5.0
+   * @see #setZooKeeperInstance(Class, Configuration, String, String)
+   * @see #setMockInstance(Class, Configuration, String)
+   */
+  public static Instance getInstance(Class<?> implementingClass, Configuration conf) {
+    String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE), "");
+    if ("MockInstance".equals(instanceType))
+      return new MockInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)));
+    else if ("ZooKeeperInstance".equals(instanceType))
+      return new ZooKeeperInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)), conf.get(enumToConfKey(implementingClass,
+          InstanceOpts.ZOO_KEEPERS)));
+    else if (instanceType.isEmpty())
+      throw new IllegalStateException("Instance has not been configured for " + implementingClass.getSimpleName());
+    else
+      throw new IllegalStateException("Unrecognized instance type " + instanceType);
+  }
+  
+  /**
+   * Sets the log level for this job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param level
+   *          the logging level
+   * @since 1.5.0
+   */
+  public static void setLogLevel(Class<?> implementingClass, Configuration conf, Level level) {
+    ArgumentChecker.notNull(level);
+    Logger.getLogger(implementingClass).setLevel(level);
+    conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt());
+  }
+  
+  /**
+   * Gets the log level from this configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the log level
+   * @since 1.5.0
+   * @see #setLogLevel(Class, Configuration, Level)
+   */
+  public static Level getLogLevel(Class<?> implementingClass, Configuration conf) {
+    return Level.toLevel(conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), Level.INFO.toInt()));
+  }
+  
+}

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/FileOutputConfigurator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/FileOutputConfigurator.java?rev=1437073&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/FileOutputConfigurator.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/FileOutputConfigurator.java Tue Jan 22 18:07:17 2013
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.util;
+
+import java.util.Arrays;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * @since 1.5.0
+ */
+public class FileOutputConfigurator extends ConfiguratorBase {
+  
+  /**
+   * Configuration keys for {@link AccumuloConfiguration}.
+   * 
+   * @since 1.5.0
+   */
+  public static enum Opts {
+    ACCUMULO_PROPERTIES;
+  }
+  
+  /**
+   * The supported Accumulo properties we set in this OutputFormat, that change the behavior of the RecordWriter.<br />
+   * These properties correspond to the supported public static setter methods available to this class.
+   * 
+   * @param property
+   *          the Accumulo property to check
+   * @since 1.5.0
+   */
+  protected static Boolean isSupportedAccumuloProperty(Property property) {
+    switch (property) {
+      case TABLE_FILE_COMPRESSION_TYPE:
+      case TABLE_FILE_COMPRESSED_BLOCK_SIZE:
+      case TABLE_FILE_BLOCK_SIZE:
+      case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX:
+      case TABLE_FILE_REPLICATION:
+        return true;
+      default:
+        return false;
+    }
+  }
+  
+  /**
+   * Helper for transforming Accumulo configuration properties into something that can be stored safely inside the Hadoop Job configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param property
+   *          the supported Accumulo property
+   * @param value
+   *          the value of the property to set
+   * @since 1.5.0
+   */
+  private static <T> void setAccumuloProperty(Class<?> implementingClass, Configuration conf, Property property, T value) {
+    if (isSupportedAccumuloProperty(property)) {
+      String val = String.valueOf(value);
+      if (property.getType().isValidFormat(val))
+        conf.set(enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "." + property.getKey(), val);
+      else
+        throw new IllegalArgumentException("Value is not appropriate for property type '" + property.getType() + "'");
+    } else
+      throw new IllegalArgumentException("Unsupported configuration property " + property.getKey());
+  }
+  
+  /**
+   * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been
+   * stored in the Job's configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @since 1.5.0
+   */
+  public static AccumuloConfiguration getAccumuloConfiguration(Class<?> implementingClass, Configuration conf) {
+    String prefix = enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + ".";
+    ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
+    for (Entry<String,String> entry : conf)
+      if (entry.getKey().startsWith(prefix))
+        acuConf.set(Property.getPropertyByKey(entry.getKey().substring(prefix.length())), entry.getValue());
+    return acuConf;
+  }
+  
+  /**
+   * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param compressionType
+   *          one of "none", "gz", "lzo", or "snappy"
+   * @since 1.5.0
+   */
+  public static void setCompressionType(Class<?> implementingClass, Configuration conf, String compressionType) {
+    if (compressionType == null || !Arrays.asList("none", "gz", "lzo", "snappy").contains(compressionType))
+      throw new IllegalArgumentException("Compression type must be one of: none, gz, lzo, snappy");
+    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType);
+  }
+  
+  /**
+   * Sets the size for data blocks within each file.<br />
+   * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group.
+   * 
+   * <p>
+   * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance).
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param dataBlockSize
+   *          the block size, in bytes
+   * @since 1.5.0
+   */
+  public static void setDataBlockSize(Class<?> implementingClass, Configuration conf, long dataBlockSize) {
+    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, dataBlockSize);
+  }
+  
+  /**
+   * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param fileBlockSize
+   *          the block size, in bytes
+   * @since 1.5.0
+   */
+  public static void setFileBlockSize(Class<?> implementingClass, Configuration conf, long fileBlockSize) {
+    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize);
+  }
+  
+  /**
+   * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow
+   * index hierarchy within the file. This can affect the performance of queries.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param indexBlockSize
+   *          the block size, in bytes
+   * @since 1.5.0
+   */
+  public static void setIndexBlockSize(Class<?> implementingClass, Configuration conf, long indexBlockSize) {
+    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, indexBlockSize);
+  }
+  
+  /**
+   * Sets the file system replication factor for the resulting file, overriding the file system default.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param replication
+   *          the number of replicas for produced files
+   * @since 1.5.0
+   */
+  public static void setReplication(Class<?> implementingClass, Configuration conf, int replication) {
+    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_REPLICATION, replication);
+  }
+  
+}



Mime
View raw message