accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject svn commit: r1433745 [2/3] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/cli/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/ core/src/main/java/org/apache/accumulo/core/iterators/user/ core/src/test/java/org/apache/ac...
Date Tue, 15 Jan 2013 23:50:42 GMT
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Tue Jan 15 23:50:42 2013
@@ -23,8 +23,11 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.math.BigInteger;
 import java.net.InetAddress;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -78,157 +81,289 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This input format provides keys and values of type K and V to the Map() and Reduce()
- * functions.
- * 
- * Subclasses must implement the following method: public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws
- * IOException, InterruptedException
- * 
- * This class includes a static class that can be used to create a RecordReader: protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V>
- * 
- * Subclasses of RecordReaderBase must implement the following method: public boolean nextKeyValue() throws IOException, InterruptedException This method should
- * set the following variables: K currentK V currentV Key currentKey (used for progress reporting) int numKeysRead (used for progress reporting)
- * 
- * See AccumuloInputFormat for an example implementation.
- * 
- * Other static methods are optional
+ * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
+ * <p>
+ * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V.
+ * <p>
+ * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its
+ * RecordReaderBase.nextKeyValue() to transform them to the desired generic types K,V.
+ * <p>
+ * See {@link AccumuloInputFormat} for an example implementation.
  */
 
 public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static final Logger log = Logger.getLogger(InputFormatBase.class);
   
+  /**
+   * Prefix shared by all job configuration property names for this class
+   */
   private static final String PREFIX = AccumuloInputFormat.class.getSimpleName();
+  
+  /**
+   * Used to limit the times a job can be configured with input information to 1
+   * 
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
+   * @see #getUsername(JobContext)
+   * @see #getPassword(JobContext)
+   * @see #getTablename(JobContext)
+   * @see #getAuthorizations(JobContext)
+   */
   private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
+  
+  /**
+   * Used to limit the times a job can be configured with instance information to 1
+   * 
+   * @see #setZooKeeperInstance(Job, String, String)
+   * @see #setMockInstance(Job, String)
+   * @see #getInstance(JobContext)
+   */
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
+  
+  /**
+   * Key for storing the Accumulo user's name
+   * 
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
+   * @see #getUsername(JobContext)
+   */
   private static final String USERNAME = PREFIX + ".username";
+  
+  /**
+   * Key for storing the Accumulo user's password
+   * 
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
+   * @see #getPassword(JobContext)
+   */
   private static final String PASSWORD = PREFIX + ".password";
+  
+  /**
+   * Key for storing the source table's name
+   * 
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
+   * @see #getTablename(JobContext)
+   */
   private static final String TABLE_NAME = PREFIX + ".tablename";
+  
+  /**
+   * Key for storing the authorizations to use to scan
+   * 
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
+   * @see #getAuthorizations(JobContext)
+   */
   private static final String AUTHORIZATIONS = PREFIX + ".authorizations";
   
+  /**
+   * Key for storing the Accumulo instance name to connect to
+   * 
+   * @see #setZooKeeperInstance(Job, String, String)
+   * @see #setMockInstance(Job, String)
+   * @see #getInstance(JobContext)
+   */
   private static final String INSTANCE_NAME = PREFIX + ".instanceName";
+  
+  /**
+   * Key for storing the set of Accumulo zookeeper servers to communicate with
+   * 
+   * @see #setZooKeeperInstance(Job, String, String)
+   * @see #getInstance(JobContext)
+   */
   private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
-  private static final String MOCK = ".useMockInstance";
   
+  /**
+   * Key for storing the directive to use the mock instance type
+   * 
+   * @see #setMockInstance(Job, String)
+   * @see #getInstance(JobContext)
+   */
+  private static final String MOCK = PREFIX + ".useMockInstance";
+  
+  /**
+   * Key for storing the set of ranges over which to query
+   * 
+   * @see #setRanges(Job, Collection)
+   * @see #getRanges(JobContext)
+   */
   private static final String RANGES = PREFIX + ".ranges";
+  
+  /**
+   * Key for storing whether to auto adjust ranges by merging overlapping ranges and splitting them on tablet boundaries
+   * 
+   * @see #setAutoAdjustRanges(Job, boolean)
+   * @see #getAutoAdjustRanges(JobContext)
+   */
   private static final String AUTO_ADJUST_RANGES = PREFIX + ".ranges.autoAdjust";
   
+  /**
+   * Key for storing the set of columns to query
+   * 
+   * @see #fetchColumns(Job, Collection)
+   * @see #getFetchedColumns(JobContext)
+   */
   private static final String COLUMNS = PREFIX + ".columns";
+  
+  /**
+   * Key for storing the desired logging level
+   * 
+   * @see #setLogLevel(Job, Level)
+   * @see #getLogLevel(JobContext)
+   */
   private static final String LOGLEVEL = PREFIX + ".loglevel";
   
+  /**
+   * Key for storing whether the scanner to use is {@link IsolatedScanner}
+   * 
+   * @see #setScanIsolation(Job, boolean)
+   * @see #isIsolated(JobContext)
+   */
   private static final String ISOLATED = PREFIX + ".isolated";
   
+  /**
+   * Key for storing whether iterators are local
+   * 
+   * @see #setLocalIterators(Job, boolean)
+   * @see #usesLocalIterators(JobContext)
+   */
   private static final String LOCAL_ITERATORS = PREFIX + ".localiters";
   
-  // Used to specify the maximum # of versions of an Accumulo cell value to return
-  private static final String MAX_VERSIONS = PREFIX + ".maxVersions";
-  
-  // Used for specifying the iterators to be applied
+  /**
+   * Key for storing the scan-time iterators to use
+   * 
+   * @see #addIterator(Job, IteratorSetting)
+   * @see #getIterators(JobContext)
+   */
   private static final String ITERATORS = PREFIX + ".iterators";
+  
+  /**
+   * Constant for separating serialized {@link IteratorSetting} configuration in the job
+   * 
+   * @see #addIterator(Job, IteratorSetting)
+   * @see #getIterators(JobContext)
+   */
   private static final String ITERATORS_DELIM = ",";
   
+  /**
+   * Key for storing whether to read a table's files while it is offline
+   * 
+   * @see #setOfflineTableScan(Job, boolean)
+   * @see #isOfflineScan(JobContext)
+   */
   private static final String READ_OFFLINE = PREFIX + ".read.offline";
   
   /**
-   * Enable or disable use of the {@link IsolatedScanner} in this configuration object. By default it is not enabled.
+   * Controls the use of the {@link IsolatedScanner} in this job.
    * 
-   * @param conf
-   *          The Hadoop configuration object
-   * @param enable
-   *          if true, enable usage of the IsolatedScanner. Otherwise, disable.
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
    */
-  public static void setIsolated(Configuration conf, boolean enable) {
-    conf.setBoolean(ISOLATED, enable);
+  public static void setScanIsolation(Job job, boolean enableFeature) {
+    job.getConfiguration().setBoolean(ISOLATED, enableFeature);
   }
   
   /**
-   * Enable or disable use of the {@link ClientSideIteratorScanner} in this configuration object. By default it is not enabled.
+   * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
+   * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
    * 
-   * @param conf
-   *          The Hadoop configuration object
-   * @param enable
-   *          if true, enable usage of the ClientSideInteratorScanner. Otherwise, disable.
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
    */
-  public static void setLocalIterators(Configuration conf, boolean enable) {
-    conf.setBoolean(LOCAL_ITERATORS, enable);
+  public static void setLocalIterators(Job job, boolean enableFeature) {
+    job.getConfiguration().setBoolean(LOCAL_ITERATORS, enableFeature);
   }
   
   /**
-   * Initialize the user, table, and authorization information for the configuration object that will be used with an Accumulo InputFormat.
+   * Sets the minimum information needed to query Accumulo in this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param user
-   *          a valid accumulo user
+   *          a valid Accumulo user name
    * @param passwd
    *          the user's password
    * @param table
-   *          the table to read
+   *          the table from which to read
    * @param auths
    *          the authorizations used to restrict data read
+   * @since 1.5.0
    */
-  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
-    if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
+  public static void setInputInfo(Job job, String user, byte[] passwd, String table, Authorizations auths) {
+    if (job.getConfiguration().getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
       throw new IllegalStateException("Input info can only be set once per job");
-    conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
+    job.getConfiguration().setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
     
     ArgumentChecker.notNull(user, passwd, table);
-    conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    conf.set(TABLE_NAME, table);
+    job.getConfiguration().set(USERNAME, user);
+    job.getConfiguration().set(PASSWORD, new String(Base64.encodeBase64(passwd)));
+    job.getConfiguration().set(TABLE_NAME, table);
     if (auths != null && !auths.isEmpty())
-      conf.set(AUTHORIZATIONS, auths.serialize());
+      job.getConfiguration().set(AUTHORIZATIONS, auths.serialize());
   }
   
   /**
-   * Configure a {@link ZooKeeperInstance} for this configuration object.
+   * Configures a {@link ZooKeeperInstance} for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param instanceName
-   *          the accumulo instance name
+   *          the Accumulo instance name
    * @param zooKeepers
    *          a comma-separated list of zookeeper servers
+   * @since 1.5.0
    */
-  public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
+  public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
+    if (job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
       throw new IllegalStateException("Instance info can only be set once per job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
+    job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
     
     ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
+    job.getConfiguration().set(INSTANCE_NAME, instanceName);
+    job.getConfiguration().set(ZOOKEEPERS, zooKeepers);
   }
   
   /**
-   * Configure a {@link MockInstance} for this configuration object.
+   * Configures a {@link MockInstance} for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param instanceName
-   *          the accumulo instance name
+   *          the Accumulo instance name
+   * @since 1.5.0
    */
-  public static void setMockInstance(Configuration conf, String instanceName) {
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    conf.setBoolean(MOCK, true);
-    conf.set(INSTANCE_NAME, instanceName);
+  public static void setMockInstance(Job job, String instanceName) {
+    job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
+    job.getConfiguration().setBoolean(MOCK, true);
+    job.getConfiguration().set(INSTANCE_NAME, instanceName);
   }
   
   /**
-   * Set the ranges to map over for this configuration object.
+   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param ranges
    *          the ranges that will be mapped over
+   * @since 1.5.0
    */
-  public static void setRanges(Configuration conf, Collection<Range> ranges) {
+  public static void setRanges(Job job, Collection<Range> ranges) {
     ArgumentChecker.notNull(ranges);
     ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
     try {
@@ -240,78 +375,73 @@ public abstract class InputFormatBase<K,
     } catch (IOException ex) {
       throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
     }
-    conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
+    job.getConfiguration().setStrings(RANGES, rangeStrings.toArray(new String[0]));
   }
   
   /**
-   * Disables the adjustment of ranges for this configuration object. By default, overlapping ranges will be merged and ranges will be fit to existing tablet
-   * boundaries. Disabling this adjustment will cause there to be exactly one mapper per range set using {@link #setRanges(Configuration, Collection)}.
+   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
+   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   */
-  public static void disableAutoAdjustRanges(Configuration conf) {
-    conf.setBoolean(AUTO_ADJUST_RANGES, false);
-  }
-  
-  /**
-   * Sets the max # of values that may be returned for an individual Accumulo cell. By default, applied before all other Accumulo iterators (highest priority)
-   * leveraged in the scan by the record reader. To adjust priority use setIterator() & setIteratorOptions() w/ the VersioningIterator type explicitly.
+   * <p>
+   * By default, this feature is <b>enabled</b>.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @param maxVersions
-   *          the max number of versions per accumulo cell
-   * @throws IOException
-   *           if maxVersions is < 1
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @see #setRanges(Job, Collection)
+   * @since 1.5.0
    */
-  public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
-    if (maxVersions < 1)
-      throw new IOException("Invalid maxVersions: " + maxVersions + ".  Must be >= 1");
-    conf.setInt(MAX_VERSIONS, maxVersions);
+  public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
+    job.getConfiguration().setBoolean(AUTO_ADJUST_RANGES, enableFeature);
   }
   
   /**
    * <p>
-   * Enable reading offline tables. This will make the map reduce job directly read the tables files. If the table is not offline, then the job will fail. If
-   * the table comes online during the map reduce job, its likely that the job will fail.
+   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
+   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
+   * fail.
    * 
    * <p>
-   * To use this option, the map reduce user will need access to read the accumulo directory in HDFS.
+   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
    * 
    * <p>
    * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
-   * on the mappers classpath. The accumulo-site.xml may need to be on the mappers classpath if HDFS or the accumlo directory in HDFS are non-standard.
+   * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
    * 
    * <p>
    * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
    * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
-   * reason to do this is that compaction will reduce each tablet in the table to one file, and its faster to read from one file.
+   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
    * 
    * <p>
    * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
    * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
    * 
-   * @param conf
-   *          the job
-   * @param scanOff
-   *          pass true to read offline tables
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
    */
-  
-  public static void setScanOffline(Configuration conf, boolean scanOff) {
-    conf.setBoolean(READ_OFFLINE, scanOff);
+  public static void setOfflineTableScan(Job job, boolean enableFeature) {
+    job.getConfiguration().setBoolean(READ_OFFLINE, enableFeature);
   }
   
   /**
-   * Restricts the columns that will be mapped over for this configuration object.
+   * Restricts the columns that will be mapped over for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param columnFamilyColumnQualifierPairs
-   *          A pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
+   *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
+   * @since 1.5.0
    */
-  public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
+  public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
     ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
     for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
@@ -323,34 +453,36 @@ public abstract class InputFormatBase<K,
         col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())));
       columnStrings.add(col);
     }
-    conf.setStrings(COLUMNS, columnStrings.toArray(new String[0]));
+    job.getConfiguration().setStrings(COLUMNS, columnStrings.toArray(new String[0]));
   }
   
   /**
-   * Sets the log level for this configuration object.
+   * Sets the log level for this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param level
    *          the logging level
+   * @since 1.5.0
    */
-  public static void setLogLevel(Configuration conf, Level level) {
+  public static void setLogLevel(Job job, Level level) {
     ArgumentChecker.notNull(level);
     log.setLevel(level);
-    conf.setInt(LOGLEVEL, level.toInt());
+    job.getConfiguration().setInt(LOGLEVEL, level.toInt());
   }
   
   /**
-   * Encode an iterator on the input for this configuration object.
+   * Encode an iterator on the input for this job.
    * 
-   * @param conf
-   *          The Hadoop configuration in which to save the iterator configuration
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param cfg
-   *          The configuration of the iterator
+   *          the configuration of the iterator
+   * @since 1.5.0
    */
-  public static void addIterator(Configuration conf, IteratorSetting cfg) {
+  public static void addIterator(Job job, IteratorSetting cfg) {
     // First check to see if anything has been set already
-    String iterators = conf.get(ITERATORS);
+    String iterators = job.getConfiguration().get(ITERATORS);
     
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     String newIter;
@@ -370,132 +502,140 @@ public abstract class InputFormatBase<K,
       iterators = iterators.concat(ITERATORS_DELIM + newIter);
     }
     // Store the iterators w/ the job
-    conf.set(ITERATORS, iterators);
+    job.getConfiguration().set(ITERATORS, iterators);
   }
   
   /**
    * Determines whether a configuration has isolation enabled.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return true if isolation is enabled, false otherwise
-   * @see #setIsolated(Configuration, boolean)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setScanIsolation(Job, boolean)
    */
-  protected static boolean isIsolated(Configuration conf) {
-    return conf.getBoolean(ISOLATED, false);
+  protected static boolean isIsolated(JobContext context) {
+    return context.getConfiguration().getBoolean(ISOLATED, false);
   }
   
   /**
    * Determines whether a configuration uses local iterators.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return true if uses local iterators, false otherwise
-   * @see #setLocalIterators(Configuration, boolean)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setLocalIterators(Job, boolean)
    */
-  protected static boolean usesLocalIterators(Configuration conf) {
-    return conf.getBoolean(LOCAL_ITERATORS, false);
+  protected static boolean usesLocalIterators(JobContext context) {
+    return context.getConfiguration().getBoolean(LOCAL_ITERATORS, false);
   }
   
   /**
    * Gets the user name from the configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param context
+   *          the Hadoop context for the configured job
    * @return the user name
-   * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
+   * @since 1.5.0
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
    */
-  protected static String getUsername(Configuration conf) {
-    return conf.get(USERNAME);
+  protected static String getUsername(JobContext context) {
+    return context.getConfiguration().get(USERNAME);
   }
   
-  
   /**
    * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
    * provide a charset safe conversion to a string, and is not intended to be secure.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the BASE64-encoded password
-   * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the decoded user password
+   * @since 1.5.0
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
    */
-  protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  protected static byte[] getPassword(JobContext context) {
+    return Base64.decodeBase64(context.getConfiguration().get(PASSWORD, "").getBytes());
   }
   
   /**
    * Gets the table name from the configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param context
+   *          the Hadoop context for the configured job
    * @return the table name
-   * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
+   * @since 1.5.0
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
    */
-  protected static String getTablename(Configuration conf) {
-    return conf.get(TABLE_NAME);
+  protected static String getTablename(JobContext context) {
+    return context.getConfiguration().get(TABLE_NAME);
   }
   
   /**
    * Gets the authorizations to set for the scans from the configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the accumulo scan authorizations
-   * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the Accumulo scan authorizations
+   * @since 1.5.0
+   * @see #setInputInfo(Job, String, byte[], String, Authorizations)
    */
-  protected static Authorizations getAuthorizations(Configuration conf) {
-    String authString = conf.get(AUTHORIZATIONS);
+  protected static Authorizations getAuthorizations(JobContext context) {
+    String authString = context.getConfiguration().get(AUTHORIZATIONS);
     return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.getBytes());
   }
   
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return an accumulo instance
-   * @see #setZooKeeperInstance(Configuration, String, String)
-   * @see #setMockInstance(Configuration, String)
-   */
-  protected static Instance getInstance(Configuration conf) {
-    if (conf.getBoolean(MOCK, false))
-      return new MockInstance(conf.get(INSTANCE_NAME));
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return an Accumulo instance
+   * @since 1.5.0
+   * @see #setZooKeeperInstance(Job, String, String)
+   * @see #setMockInstance(Job, String)
+   */
+  protected static Instance getInstance(JobContext context) {
+    if (context.getConfiguration().getBoolean(MOCK, false))
+      return new MockInstance(context.getConfiguration().get(INSTANCE_NAME));
+    return new ZooKeeperInstance(context.getConfiguration().get(INSTANCE_NAME), context.getConfiguration().get(ZOOKEEPERS));
   }
   
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return an accumulo tablet locator
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return an Accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
+   * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
-    if (conf.getBoolean(MOCK, false))
+  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
+    if (context.getConfiguration().getBoolean(MOCK, false))
       return new MockTabletLocator();
-    Instance instance = getInstance(conf);
-    String username = getUsername(conf);
-    byte[] password = getPassword(conf);
-    String tableName = getTablename(conf);
+    Instance instance = getInstance(context);
+    String username = getUsername(context);
+    byte[] password = getPassword(context);
+    String tableName = getTablename(context);
     return TabletLocator.getInstance(instance, new AuthInfo(username, ByteBuffer.wrap(password), instance.getInstanceID()),
         new Text(Tables.getTableId(instance, tableName)));
   }
   
   /**
-   * Gets the ranges to scan over from a configuration object.
+   * Gets the ranges to scan over from a job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param context
+   *          the Hadoop context for the configured job
    * @return the ranges
    * @throws IOException
    *           if the ranges have been encoded improperly
-   * @see #setRanges(Configuration, Collection)
+   * @since 1.5.0
+   * @see #setRanges(Job, Collection)
    */
-  protected static List<Range> getRanges(Configuration conf) throws IOException {
+  protected static List<Range> getRanges(JobContext context) throws IOException {
     ArrayList<Range> ranges = new ArrayList<Range>();
-    for (String rangeString : conf.getStringCollection(RANGES)) {
+    for (String rangeString : context.getConfiguration().getStringCollection(RANGES)) {
       ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
       Range range = new Range();
       range.readFields(new DataInputStream(bais));
@@ -505,16 +645,17 @@ public abstract class InputFormatBase<K,
   }
   
   /**
-   * Gets the columns to be mapped over from this configuration object.
+   * Gets the columns to be mapped over from this job.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param context
+   *          the Hadoop context for the configured job
    * @return a set of columns
-   * @see #fetchColumns(Configuration, Collection)
+   * @since 1.5.0
+   * @see #fetchColumns(Job, Collection)
    */
-  protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
+  protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
     Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
-    for (String col : conf.getStringCollection(COLUMNS)) {
+    for (String col : context.getConfiguration().getStringCollection(COLUMNS)) {
       int idx = col.indexOf(":");
       Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes()) : Base64.decodeBase64(col.substring(0, idx).getBytes()));
       Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
@@ -526,52 +667,55 @@ public abstract class InputFormatBase<K,
   /**
    * Determines whether a configuration has auto-adjust ranges enabled.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return true if auto-adjust is enabled, false otherwise
-   * @see #disableAutoAdjustRanges(Configuration)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return false if the feature is disabled, true otherwise
+   * @since 1.5.0
+   * @see #setAutoAdjustRanges(Job, boolean)
    */
-  protected static boolean getAutoAdjustRanges(Configuration conf) {
-    return conf.getBoolean(AUTO_ADJUST_RANGES, true);
+  protected static boolean getAutoAdjustRanges(JobContext context) {
+    return context.getConfiguration().getBoolean(AUTO_ADJUST_RANGES, true);
   }
   
   /**
    * Gets the log level from this configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param context
+   *          the Hadoop context for the configured job
    * @return the log level
-   * @see #setLogLevel(Configuration, Level)
+   * @since 1.5.0
+   * @see #setLogLevel(Job, Level)
    */
-  protected static Level getLogLevel(Configuration conf) {
-    return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
+  protected static Level getLogLevel(JobContext context) {
+    return Level.toLevel(context.getConfiguration().getInt(LOGLEVEL, Level.INFO.toInt()));
   }
   
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param context
+   *          the Hadoop context for the configured job
    * @throws IOException
-   *           if the configuration is improperly configured
+   *           if the context is improperly configured
+   * @since 1.5.0
    */
-  protected static void validateOptions(Configuration conf) throws IOException {
-    if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
+  protected static void validateOptions(JobContext context) throws IOException {
+    if (!context.getConfiguration().getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
       throw new IOException("Input info has not been set.");
-    if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
+    if (!context.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
       throw new IOException("Instance info has not been set.");
     // validate that we can connect as configured
     try {
-      Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
-      if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf)))
+      Connector c = getInstance(context).getConnector(getUsername(context), getPassword(context));
+      if (!c.securityOperations().authenticateUser(getUsername(context), getPassword(context)))
         throw new IOException("Unable to authenticate user");
-      if (!c.securityOperations().hasTablePermission(getUsername(conf), getTablename(conf), TablePermission.READ))
+      if (!c.securityOperations().hasTablePermission(getUsername(context), getTablename(context), TablePermission.READ))
         throw new IOException("Unable to access table");
       
-      if (!usesLocalIterators(conf)) {
+      if (!usesLocalIterators(context)) {
         // validate that any scan-time iterators can be loaded by the the tablet servers
-        for (IteratorSetting iter : getIterators(conf)) {
+        for (IteratorSetting iter : getIterators(context)) {
           if (!c.instanceOperations().testClassLoad(iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
             throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
         }
@@ -585,41 +729,37 @@ public abstract class InputFormatBase<K,
   }
   
   /**
-   * Gets the maxVersions to use for the {@link VersioningIterator} from this configuration.
+   * Determines whether a configuration has the offline table scan feature enabled.
    * 
-   * @param conf
-   *          the Hadoop configuration object
-   * @return the max versions, -1 if not configured
-   * @see #setMaxVersions(Configuration, int)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setOfflineTableScan(Job, boolean)
    */
-  protected static int getMaxVersions(Configuration conf) {
-    return conf.getInt(MAX_VERSIONS, -1);
-  }
-  
-  protected static boolean isOfflineScan(Configuration conf) {
-    return conf.getBoolean(READ_OFFLINE, false);
+  protected static boolean isOfflineScan(JobContext context) {
+    return context.getConfiguration().getBoolean(READ_OFFLINE, false);
   }
   
-  // Return a list of the iterator settings (for iterators to apply to a scanner)
-  
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
    * 
-   * @param conf
-   *          the Hadoop configuration object
+   * @param context
+   *          the Hadoop context for the configured job
    * @return a list of iterators
-   * @see #addIterator(Configuration, IteratorSetting)
+   * @since 1.5.0
+   * @see #addIterator(Job, IteratorSetting)
    */
-  protected static List<IteratorSetting> getIterators(Configuration conf) {
+  protected static List<IteratorSetting> getIterators(JobContext context) {
     
-    String iterators = conf.get(ITERATORS);
+    String iterators = context.getConfiguration().get(ITERATORS);
     
     // If no iterators are present, return an empty list
     if (iterators == null || iterators.isEmpty())
       return new ArrayList<IteratorSetting>();
     
     // Compose the set of iterators encoded in the job configuration
-    StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS), ITERATORS_DELIM);
+    StringTokenizer tokens = new StringTokenizer(context.getConfiguration().get(ITERATORS), ITERATORS_DELIM);
     List<IteratorSetting> list = new ArrayList<IteratorSetting>();
     try {
       while (tokens.hasMoreTokens()) {
@@ -634,6 +774,18 @@ public abstract class InputFormatBase<K,
     return list;
   }
   
+  /**
+   * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
+   * types.
+   * 
+   * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables:
+   * <ul>
+   * <li>K {@link #currentK}</li>
+   * <li>V {@link #currentV}</li>
+   * <li>Key {@link #currentKey} (used for progress reporting)</li>
+   * <li>int {@link #numKeysRead} (used for progress reporting)</li>
+   * </ul>
+   */
   protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
@@ -642,80 +794,57 @@ public abstract class InputFormatBase<K,
     /**
      * Apply the configured iterators from the configuration to the scanner.
      * 
-     * @param conf
-     *          the Hadoop configuration object
+     * @param context
+     *          the Hadoop context for the configured job
      * @param scanner
      *          the scanner to configure
-     * @throws AccumuloException
      */
-    protected void setupIterators(Configuration conf, Scanner scanner) throws AccumuloException {
-      List<IteratorSetting> iterators = getIterators(conf);
+    protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
+      List<IteratorSetting> iterators = getIterators(context);
       for (IteratorSetting iterator : iterators) {
         scanner.addScanIterator(iterator);
       }
     }
     
     /**
-     * If maxVersions has been set, configure a {@link VersioningIterator} at priority 0 for this scanner.
-     * 
-     * @param conf
-     *          the Hadoop configuration object
-     * @param scanner
-     *          the scanner to configure
-     */
-    protected void setupMaxVersions(Configuration conf, Scanner scanner) {
-      int maxVersions = getMaxVersions(conf);
-      // Check to make sure its a legit value
-      if (maxVersions >= 1) {
-        IteratorSetting vers = new IteratorSetting(0, "vers", VersioningIterator.class);
-        VersioningIterator.setMaxVersions(vers, maxVersions);
-        scanner.addScanIterator(vers);
-      }
-    }
-    
-    /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
+    @Override
     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
-      initialize(inSplit, attempt.getConfiguration());
-    }
-    
-    public void initialize(InputSplit inSplit, Configuration conf) throws IOException {
       Scanner scanner;
       split = (RangeInputSplit) inSplit;
       log.debug("Initializing input split: " + split.range);
-      Instance instance = getInstance(conf);
-      String user = getUsername(conf);
-      byte[] password = getPassword(conf);
-      Authorizations authorizations = getAuthorizations(conf);
+      Instance instance = getInstance(attempt);
+      String user = getUsername(attempt);
+      byte[] password = getPassword(attempt);
+      Authorizations authorizations = getAuthorizations(attempt);
       
       try {
         log.debug("Creating connector with user: " + user);
         Connector conn = instance.getConnector(user, password);
-        log.debug("Creating scanner for table: " + getTablename(conf));
+        log.debug("Creating scanner for table: " + getTablename(attempt));
         log.debug("Authorizations are: " + authorizations);
-        if (isOfflineScan(conf)) {
+        if (isOfflineScan(attempt)) {
           scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password), instance.getInstanceID()), Tables.getTableId(instance,
-              getTablename(conf)), authorizations);
+              getTablename(attempt)), authorizations);
         } else {
-          scanner = conn.createScanner(getTablename(conf), authorizations);
+          scanner = conn.createScanner(getTablename(attempt), authorizations);
         }
-        if (isIsolated(conf)) {
+        if (isIsolated(attempt)) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
         }
-        if (usesLocalIterators(conf)) {
+        if (usesLocalIterators(attempt)) {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupMaxVersions(conf, scanner);
-        setupIterators(conf, scanner);
+        setupIterators(attempt, scanner);
       } catch (Exception e) {
         throw new IOException(e);
       }
       
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(conf)) {
+      for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -733,8 +862,20 @@ public abstract class InputFormatBase<K,
       scannerIterator = scanner.iterator();
     }
     
+    /**
+     * @deprecated since 1.5.0; Use {@link #initialize(InputSplit, TaskAttemptContext)} instead. <br />
+     *             This class is written for the Hadoop MapReduce framework, and is not guaranteed to function in other frameworks. Other frameworks using this
+     *             class are expected to wrap their objects to conform to the supported framework.
+     */
+    @Deprecated
+    public void initialize(InputSplit inSplit, Configuration conf) throws IOException {
+      initialize(inSplit, new TaskAttemptContext(conf, new TaskAttemptID()));
+    }
+    
+    @Override
     public void close() {}
     
+    @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
@@ -860,17 +1001,14 @@ public abstract class InputFormatBase<K,
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
-    return getSplits(job.getConfiguration());
-  }
-  
-  public List<InputSplit> getSplits(Configuration conf) throws IOException {
-    log.setLevel(getLogLevel(conf));
-    validateOptions(conf);
-    
-    String tableName = getTablename(conf);
-    boolean autoAdjust = getAutoAdjustRanges(conf);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(conf)) : getRanges(conf);
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    log.setLevel(getLogLevel(context));
+    validateOptions(context);
+    
+    String tableName = getTablename(context);
+    boolean autoAdjust = getAutoAdjustRanges(context);
+    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
     
     if (ranges.isEmpty()) {
       ranges = new ArrayList<Range>(1);
@@ -881,17 +1019,17 @@ public abstract class InputFormatBase<K,
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
     try {
-      if (isOfflineScan(conf)) {
-        binnedRanges = binOfflineTable(conf, tableName, ranges);
+      if (isOfflineScan(context)) {
+        binnedRanges = binOfflineTable(context.getConfiguration(), tableName, ranges);
         while (binnedRanges == null) {
           // Some tablets were still online, try again
           UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(conf, tableName, ranges);
+          binnedRanges = binOfflineTable(context.getConfiguration(), tableName, ranges);
         }
       } else {
-        Instance instance = getInstance(conf);
+        Instance instance = getInstance(context);
         String tableId = null;
-        tl = getTabletLocator(conf);
+        tl = getTabletLocator(context);
         // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
         tl.invalidateCache();
         while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
@@ -955,6 +1093,14 @@ public abstract class InputFormatBase<K,
   }
   
   /**
+   * @deprecated since 1.5.0; Use {@link #getSplits(JobContext)} instead.
+   */
+  @Deprecated
+  public List<InputSplit> getSplits(Configuration conf) throws IOException {
+    return getSplits(new TaskAttemptContext(conf, new TaskAttemptID()));
+  }
+  
+  /**
    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
    */
   public static class RangeInputSplit extends InputSplit implements Writable {
@@ -1026,6 +1172,7 @@ public abstract class InputFormatBase<K,
     /**
      * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
      */
+    @Override
     public long getLength() throws IOException {
       Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
       Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
@@ -1045,6 +1192,7 @@ public abstract class InputFormatBase<K,
       return diff + 1;
     }
     
+    @Override
     public String[] getLocations() throws IOException {
       return locations;
     }
@@ -1053,6 +1201,7 @@ public abstract class InputFormatBase<K,
       this.locations = locations;
     }
     
+    @Override
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
       int numLocs = in.readInt();
@@ -1061,6 +1210,7 @@ public abstract class InputFormatBase<K,
         locations[i] = in.readUTF();
     }
     
+    @Override
     public void write(DataOutput out) throws IOException {
       range.write(out);
       out.writeInt(locations.length);
@@ -1068,4 +1218,498 @@ public abstract class InputFormatBase<K,
         out.writeUTF(locations[i]);
     }
   }
+  
+  // ----------------------------------------------------------------------------------------------------
+  // Everything below this line is deprecated and should go away in future versions
+  // ----------------------------------------------------------------------------------------------------
+  
+  /**
+   * @deprecated since 1.5.0;
+   * @see #addIterator(Configuration, IteratorSetting)
+   * @see #getIteratorOptions(Configuration)
+   */
+  @Deprecated
+  private static final String ITERATORS_OPTIONS = PREFIX + ".iterators.options";
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setScanIsolation(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void setIsolated(Configuration conf, boolean enable) {
+    conf.setBoolean(ISOLATED, enable);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setLocalIterators(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void setLocalIterators(Configuration conf, boolean enable) {
+    conf.setBoolean(LOCAL_ITERATORS, enable);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setInputInfo(Job, String, byte[], String, Authorizations)} instead.
+   */
+  @Deprecated
+  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
+    if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
+      throw new IllegalStateException("Input info can only be set once per job");
+    conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
+    
+    ArgumentChecker.notNull(user, passwd, table);
+    conf.set(USERNAME, user);
+    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
+    conf.set(TABLE_NAME, table);
+    if (auths != null && !auths.isEmpty())
+      conf.set(AUTHORIZATIONS, auths.serialize());
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead.
+   */
+  @Deprecated
+  public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
+    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
+      throw new IllegalStateException("Instance info can only be set once per job");
+    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
+    
+    ArgumentChecker.notNull(instanceName, zooKeepers);
+    conf.set(INSTANCE_NAME, instanceName);
+    conf.set(ZOOKEEPERS, zooKeepers);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead.
+   */
+  @Deprecated
+  public static void setMockInstance(Configuration conf, String instanceName) {
+    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
+    conf.setBoolean(MOCK, true);
+    conf.set(INSTANCE_NAME, instanceName);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setRanges(Job, Collection)} instead.
+   */
+  @Deprecated
+  public static void setRanges(Configuration conf, Collection<Range> ranges) {
+    ArgumentChecker.notNull(ranges);
+    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
+    try {
+      for (Range r : ranges) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        r.write(new DataOutputStream(baos));
+        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
+      }
+    } catch (IOException ex) {
+      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
+    }
+    conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setAutoAdjustRanges(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void disableAutoAdjustRanges(Configuration conf) {
+    conf.setBoolean(AUTO_ADJUST_RANGES, false);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
+   */
+  @Deprecated
+  public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
+    if (maxVersions < 1)
+      throw new IOException("Invalid maxVersions: " + maxVersions + ".  Must be >= 1");
+    // Check to make sure its a legit value
+    if (maxVersions >= 1) {
+      IteratorSetting vers = new IteratorSetting(0, "vers", VersioningIterator.class);
+      VersioningIterator.setMaxVersions(vers, maxVersions);
+      addIterator(conf, vers);
+    }
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setOfflineTableScan(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void setScanOffline(Configuration conf, boolean scanOff) {
+    conf.setBoolean(READ_OFFLINE, scanOff);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #fetchColumns(Job, Collection)} instead.
+   */
+  @Deprecated
+  public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
+    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
+    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
+    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+      if (column.getFirst() == null)
+        throw new IllegalArgumentException("Column family can not be null");
+      
+      String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())));
+      if (column.getSecond() != null)
+        col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())));
+      columnStrings.add(col);
+    }
+    conf.setStrings(COLUMNS, columnStrings.toArray(new String[0]));
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead.
+   */
+  @Deprecated
+  public static void setLogLevel(Configuration conf, Level level) {
+    ArgumentChecker.notNull(level);
+    log.setLevel(level);
+    conf.setInt(LOGLEVEL, level.toInt());
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} instead.
+   */
+  @Deprecated
+  public static void addIterator(Configuration conf, IteratorSetting cfg) {
+    // First check to see if anything has been set already
+    String iterators = conf.get(ITERATORS);
+    
+    // No iterators specified yet, create a new string
+    if (iterators == null || iterators.isEmpty()) {
+      iterators = new AccumuloIterator(cfg.getPriority(), cfg.getIteratorClass(), cfg.getName()).toString();
+    } else {
+      // append the next iterator & reset
+      iterators = iterators.concat(ITERATORS_DELIM + new AccumuloIterator(cfg.getPriority(), cfg.getIteratorClass(), cfg.getName()).toString());
+    }
+    // Store the iterators w/ the job
+    conf.set(ITERATORS, iterators);
+    for (Entry<String,String> entry : cfg.getOptions().entrySet()) {
+      if (entry.getValue() == null)
+        continue;
+      
+      String iteratorOptions = conf.get(ITERATORS_OPTIONS);
+      
+      // No options specified yet, create a new string
+      if (iteratorOptions == null || iteratorOptions.isEmpty()) {
+        iteratorOptions = new AccumuloIteratorOption(cfg.getName(), entry.getKey(), entry.getValue()).toString();
+      } else {
+        // append the next option & reset
+        iteratorOptions = iteratorOptions.concat(ITERATORS_DELIM + new AccumuloIteratorOption(cfg.getName(), entry.getKey(), entry.getValue()));
+      }
+      
+      // Store the options w/ the job
+      conf.set(ITERATORS_OPTIONS, iteratorOptions);
+    }
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #isIsolated(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean isIsolated(Configuration conf) {
+    return conf.getBoolean(ISOLATED, false);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #usesLocalIterators(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean usesLocalIterators(Configuration conf) {
+    return conf.getBoolean(LOCAL_ITERATORS, false);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead.
+   */
+  @Deprecated
+  protected static String getUsername(Configuration conf) {
+    return conf.get(USERNAME);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead.
+   */
+  @Deprecated
+  protected static byte[] getPassword(Configuration conf) {
+    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getTablename(JobContext)} instead.
+   */
+  @Deprecated
+  protected static String getTablename(Configuration conf) {
+    return conf.get(TABLE_NAME);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getAuthorizations(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Authorizations getAuthorizations(Configuration conf) {
+    String authString = conf.get(AUTHORIZATIONS);
+    return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.split(","));
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Instance getInstance(Configuration conf) {
+    if (conf.getBoolean(MOCK, false))
+      return new MockInstance(conf.get(INSTANCE_NAME));
+    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getTabletLocator(JobContext)} instead.
+   */
+  @Deprecated
+  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
+    if (conf.getBoolean(MOCK, false))
+      return new MockTabletLocator();
+    Instance instance = getInstance(conf);
+    String username = getUsername(conf);
+    byte[] password = getPassword(conf);
+    String tableName = getTablename(conf);
+    return TabletLocator.getInstance(instance, new AuthInfo(username, ByteBuffer.wrap(password), instance.getInstanceID()),
+        new Text(Tables.getTableId(instance, tableName)));
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getRanges(JobContext)} instead.
+   */
+  @Deprecated
+  protected static List<Range> getRanges(Configuration conf) throws IOException {
+    ArrayList<Range> ranges = new ArrayList<Range>();
+    for (String rangeString : conf.getStringCollection(RANGES)) {
+      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
+      Range range = new Range();
+      range.readFields(new DataInputStream(bais));
+      ranges.add(range);
+    }
+    return ranges;
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getFetchedColumns(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
+    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
+    for (String col : conf.getStringCollection(COLUMNS)) {
+      int idx = col.indexOf(":");
+      Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes()) : Base64.decodeBase64(col.substring(0, idx).getBytes()));
+      Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
+      columns.add(new Pair<Text,Text>(cf, cq));
+    }
+    return columns;
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getAutoAdjustRanges(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean getAutoAdjustRanges(Configuration conf) {
+    return conf.getBoolean(AUTO_ADJUST_RANGES, true);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Level getLogLevel(Configuration conf) {
+    return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #validateOptions(JobContext)} instead.
+   */
+  @Deprecated
+  protected static void validateOptions(Configuration conf) throws IOException {
+    if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
+      throw new IOException("Input info has not been set.");
+    if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
+      throw new IOException("Instance info has not been set.");
+    // validate that we can connect as configured
+    try {
+      Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
+      if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf)))
+        throw new IOException("Unable to authenticate user");
+      if (!c.securityOperations().hasTablePermission(getUsername(conf), getTablename(conf), TablePermission.READ))
+        throw new IOException("Unable to access table");
+      
+      if (!usesLocalIterators(conf)) {
+        // validate that any scan-time iterators can be loaded by the the tablet servers
+        for (AccumuloIterator iter : getIterators(conf)) {
+          if (!c.instanceOperations().testClassLoad(iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
+            throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+        }
+      }
+      
+    } catch (AccumuloException e) {
+      throw new IOException(e);
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
+   */
+  @Deprecated
+  protected static int getMaxVersions(Configuration conf) {
+    return -1;
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #isOfflineScan(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean isOfflineScan(Configuration conf) {
+    return conf.getBoolean(READ_OFFLINE, false);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
+   */
+  @Deprecated
+  protected static List<AccumuloIterator> getIterators(Configuration conf) {
+    
+    String iterators = conf.get(ITERATORS);
+    
+    // If no iterators are present, return an empty list
+    if (iterators == null || iterators.isEmpty())
+      return new ArrayList<AccumuloIterator>();
+    
+    // Compose the set of iterators encoded in the job configuration
+    StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS), ITERATORS_DELIM);
+    List<AccumuloIterator> list = new ArrayList<AccumuloIterator>();
+    while (tokens.hasMoreTokens()) {
+      String itstring = tokens.nextToken();
+      list.add(new AccumuloIterator(itstring));
+    }
+    return list;
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
+   */
+  @Deprecated
+  protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
+    String iteratorOptions = conf.get(ITERATORS_OPTIONS);
+    
+    // If no options are present, return an empty list
+    if (iteratorOptions == null || iteratorOptions.isEmpty())
+      return new ArrayList<AccumuloIteratorOption>();
+    
+    // Compose the set of options encoded in the job configuration
+    StringTokenizer tokens = new StringTokenizer(conf.get(ITERATORS_OPTIONS), ITERATORS_DELIM);
+    List<AccumuloIteratorOption> list = new ArrayList<AccumuloIteratorOption>();
+    while (tokens.hasMoreTokens()) {
+      String optionString = tokens.nextToken();
+      list.add(new AccumuloIteratorOption(optionString));
+    }
+    return list;
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
+   */
+  @Deprecated
+  static class AccumuloIterator {
+    
+    private static final String FIELD_SEP = ":";
+    
+    private int priority;
+    private String iteratorClass;
+    private String iteratorName;
+    
+    public AccumuloIterator(int priority, String iteratorClass, String iteratorName) {
+      this.priority = priority;
+      this.iteratorClass = iteratorClass;
+      this.iteratorName = iteratorName;
+    }
+    
+    // Parses out a setting given an string supplied from an earlier toString() call
+    public AccumuloIterator(String iteratorSetting) {
+      // Parse the string to expand the iterator
+      StringTokenizer tokenizer = new StringTokenizer(iteratorSetting, FIELD_SEP);
+      priority = Integer.parseInt(tokenizer.nextToken());
+      iteratorClass = tokenizer.nextToken();
+      iteratorName = tokenizer.nextToken();
+    }
+    
+    public int getPriority() {
+      return priority;
+    }
+    
+    public String getIteratorClass() {
+      return iteratorClass;
+    }
+    
+    public String getIteratorName() {
+      return iteratorName;
+    }
+    
+    @Override
+    public String toString() {
+      return new String(priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName);
+    }
+    
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
+   */
+  @Deprecated
+  static class AccumuloIteratorOption {
+    private static final String FIELD_SEP = ":";
+    
+    private String iteratorName;
+    private String key;
+    private String value;
+    
+    public AccumuloIteratorOption(String iteratorName, String key, String value) {
+      this.iteratorName = iteratorName;
+      this.key = key;
+      this.value = value;
+    }
+    
+    // Parses out an option given a string supplied from an earlier toString() call
+    public AccumuloIteratorOption(String iteratorOption) {
+      StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP);
+      this.iteratorName = tokenizer.nextToken();
+      try {
+        this.key = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
+        this.value = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    public String getIteratorName() {
+      return iteratorName;
+    }
+    
+    public String getKey() {
+      return key;
+    }
+    
+    public String getValue() {
+      return value;
+    }
+    
+    @Override
+    public String toString() {
+      try {
+        return new String(iteratorName + FIELD_SEP + URLEncoder.encode(key, "UTF-8") + FIELD_SEP + URLEncoder.encode(value, "UTF-8"));
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+  }
+  
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/VersioningIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/VersioningIterator.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/VersioningIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/VersioningIterator.java Tue Jan 15 23:50:42 2013
@@ -160,6 +160,8 @@ public class VersioningIterator extends 
    * @param maxVersions
    */
   public static void setMaxVersions(IteratorSetting cfg, int maxVersions) {
+    if (maxVersions < 1)
+      throw new IllegalArgumentException(MAXVERSIONS_OPT + " for versioning iterator must be >= 1");
     cfg.addOption(MAXVERSIONS_OPT, Integer.toString(maxVersions));
   }
 }

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java Tue Jan 15 23:50:42 2013
@@ -138,8 +138,8 @@ public class AccumuloFileOutputFormatTes
       Authorizations authorizations;
       authorizations = Constants.NO_AUTHS;
       
-      AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), table, authorizations);
-      AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testinstance");
+      AccumuloInputFormat.setInputInfo(job, user, pass.getBytes(), table, authorizations);
+      AccumuloInputFormat.setMockInstance(job, "testinstance");
       AccumuloFileOutputFormat.setOutputPath(job, new Path(args[3]));
       
       job.setMapperClass("badtable".equals(table) ? BadKeyMapper.class : Mapper.class);
@@ -195,7 +195,7 @@ public class AccumuloFileOutputFormatTes
     long b = 300l;
     long c = 50l;
     long d = 10l;
-    String e = "type";
+    String e = "snappy";
     
     Job job = new Job();
     AccumuloFileOutputFormat.setReplication(job, a);
@@ -206,10 +206,32 @@ public class AccumuloFileOutputFormatTes
     
     AccumuloConfiguration acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job);
     
-    assertEquals(a, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
-    assertEquals(b, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
-    assertEquals(c, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
-    assertEquals(d, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
-    assertEquals(e, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+    assertEquals(7, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+    assertEquals(300l, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+    assertEquals(50l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+    assertEquals(10l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+    
+    a = 17;
+    b = 1300l;
+    c = 150l;
+    d = 110l;
+    e = "lzo";
+    
+    job = new Job();
+    AccumuloFileOutputFormat.setReplication(job, a);
+    AccumuloFileOutputFormat.setFileBlockSize(job, b);
+    AccumuloFileOutputFormat.setDataBlockSize(job, c);
+    AccumuloFileOutputFormat.setIndexBlockSize(job, d);
+    AccumuloFileOutputFormat.setCompressionType(job, e);
+    
+    acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job);
+    
+    assertEquals(17, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+    assertEquals(1300l, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+    assertEquals(150l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+    assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+    
   }
 }

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Tue Jan 15 23:50:42 2013
@@ -46,51 +46,10 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.junit.After;
 import org.junit.Test;
 
 public class AccumuloInputFormatTest {
   
-  @After
-  public void tearDown() throws Exception {}
-  
-  /**
-   * Test basic setting & getting of max versions.
-   * 
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @Test
-  public void testMaxVersions() throws IOException {
-    Job job = new Job();
-    AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1);
-    int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration());
-    assertEquals(1, version);
-  }
-  
-  /**
-   * Test max versions with an invalid value.
-   * 
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @Test(expected = IOException.class)
-  public void testMaxVersionsLessThan1() throws IOException {
-    Job job = new Job();
-    AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0);
-  }
-  
-  /**
-   * Test no max version configured.
-   * 
-   * @throws IOException
-   */
-  @Test
-  public void testNoMaxVersion() throws IOException {
-    Job job = new Job();
-    assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConfiguration()));
-  }
-  
   /**
    * Check that the iterator configuration is getting stored in the Job conf correctly.
    * 
@@ -101,7 +60,7 @@ public class AccumuloInputFormatTest {
     Job job = new Job();
     
     IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator");
-    AccumuloInputFormat.addIterator(job.getConfiguration(), is);
+    AccumuloInputFormat.addIterator(job, is);
     Configuration conf = job.getConfiguration();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     is.write(new DataOutputStream(baos));
@@ -113,14 +72,14 @@ public class AccumuloInputFormatTest {
   public void testAddIterator() throws IOException {
     Job job = new Job();
     
-    AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
-    AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
     IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
     iter.addOption("v1", "1");
     iter.addOption("junk", "\0omg:!\\xyzzy");
-    AccumuloInputFormat.addIterator(job.getConfiguration(), iter);
+    AccumuloInputFormat.addIterator(job, iter);
     
-    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job.getConfiguration());
+    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
     
     // Check the list size
     assertTrue(list.size() == 3);
@@ -160,9 +119,9 @@ public class AccumuloInputFormatTest {
     IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class");
     someSetting.addOption(key, value);
     Job job = new Job();
-    AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting);
+    AccumuloInputFormat.addIterator(job, someSetting);
     
-    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job.getConfiguration());
+    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
     assertEquals(1, list.size());
     assertEquals(1, list.get(0).getOptions().size());
     assertEquals(list.get(0).getOptions().get(key), value);
@@ -170,8 +129,8 @@ public class AccumuloInputFormatTest {
     someSetting.addOption(key + "2", value);
     someSetting.setPriority(2);
     someSetting.setName("it2");
-    AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting);
-    list = AccumuloInputFormat.getIterators(job.getConfiguration());
+    AccumuloInputFormat.addIterator(job, someSetting);
+    list = AccumuloInputFormat.getIterators(job);
     assertEquals(2, list.size());
     assertEquals(1, list.get(0).getOptions().size());
     assertEquals(list.get(0).getOptions().get(key), value);
@@ -189,11 +148,11 @@ public class AccumuloInputFormatTest {
   public void testGetIteratorSettings() throws IOException {
     Job job = new Job();
     
-    AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
-    AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
-    AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
     
-    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job.getConfiguration());
+    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
     
     // Check the list size
     assertTrue(list.size() == 3);
@@ -224,9 +183,9 @@ public class AccumuloInputFormatTest {
     
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, regex, null, null, null, false);
-    AccumuloInputFormat.addIterator(job.getConfiguration(), is);
+    AccumuloInputFormat.addIterator(job, is);
     
-    assertTrue(regex.equals(AccumuloInputFormat.getIterators(job.getConfiguration()).get(0).getName()));
+    assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName()));
   }
   
   private static AssertionError e1 = null;
@@ -277,8 +236,8 @@ public class AccumuloInputFormatTest {
       
       job.setInputFormatClass(AccumuloInputFormat.class);
       
-      AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), table, Constants.NO_AUTHS);
-      AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
+      AccumuloInputFormat.setInputInfo(job, user, pass.getBytes(), table, Constants.NO_AUTHS);
+      AccumuloInputFormat.setMockInstance(job, "testmapinstance");
       
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java Tue Jan 15 23:50:42 2013
@@ -94,8 +94,8 @@ public class AccumuloOutputFormatTest {
       
       job.setInputFormatClass(AccumuloInputFormat.class);
       
-      AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), table1, Constants.NO_AUTHS);
-      AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmrinstance");
+      AccumuloInputFormat.setInputInfo(job, user, pass.getBytes(), table1, Constants.NO_AUTHS);
+      AccumuloInputFormat.setMockInstance(job, "testmrinstance");
       
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -104,8 +104,8 @@ public class AccumuloOutputFormatTest {
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Mutation.class);
       
-      AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user, pass.getBytes(), false, table2);
-      AccumuloOutputFormat.setMockInstance(job.getConfiguration(), "testmrinstance");
+      AccumuloOutputFormat.setOutputInfo(job, user, pass.getBytes(), false, table2);
+      AccumuloOutputFormat.setMockInstance(job, "testmrinstance");
       
       job.setNumReduceTasks(0);
       

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java Tue Jan 15 23:50:42 2013
@@ -153,8 +153,8 @@ public class AccumuloRowInputFormatTest 
       
       job.setInputFormatClass(AccumuloRowInputFormat.class);
       
-      AccumuloRowInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), table, Constants.NO_AUTHS);
-      AccumuloRowInputFormat.setMockInstance(job.getConfiguration(), "instance1");
+      AccumuloRowInputFormat.setInputInfo(job, user, pass.getBytes(), table, Constants.NO_AUTHS);
+      AccumuloRowInputFormat.setMockInstance(job, "instance1");
       
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);

Modified: accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java (original)
+++ accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java Tue Jan 15 23:50:42 2013
@@ -37,24 +37,26 @@ import com.beust.jcommander.Parameter;
 
 public class RegexExample extends Configured implements Tool {
   public static class RegexMapper extends Mapper<Key,Value,Key,Value> {
+    @Override
     public void map(Key row, Value data, Context context) throws IOException, InterruptedException {
       context.write(row, data);
     }
   }
   
   static class Opts extends ClientOnRequiredTable {
-    @Parameter(names="--rowRegex")
+    @Parameter(names = "--rowRegex")
     String rowRegex;
-    @Parameter(names="--columnFamilyRegex")
+    @Parameter(names = "--columnFamilyRegex")
     String columnFamilyRegex;
-    @Parameter(names="--columnQualifierRegex")
+    @Parameter(names = "--columnQualifierRegex")
     String columnQualifierRegex;
-    @Parameter(names="--valueRegex")
+    @Parameter(names = "--valueRegex")
     String valueRegex;
-    @Parameter(names="--output", required=true)
+    @Parameter(names = "--output", required = true)
     String destination;
   }
   
+  @Override
   public int run(String[] args) throws Exception {
     Opts opts = new Opts();
     opts.parseArgs(getClass().getName(), args);
@@ -67,7 +69,7 @@ public class RegexExample extends Config
     
     IteratorSetting regex = new IteratorSetting(50, "regex", RegExFilter.class);
     RegExFilter.setRegexs(regex, opts.rowRegex, opts.columnFamilyRegex, opts.columnQualifierRegex, opts.valueRegex, false);
-    AccumuloInputFormat.addIterator(job.getConfiguration(), regex);
+    AccumuloInputFormat.addIterator(job, regex);
     
     job.setMapperClass(RegexMapper.class);
     job.setMapOutputKeyClass(Key.class);

Modified: accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java (original)
+++ accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java Tue Jan 15 23:50:42 2013
@@ -43,6 +43,7 @@ public class RowHash extends Configured 
    * The Mapper class that given a row number, will generate the appropriate output line.
    */
   public static class HashDataMapper extends Mapper<Key,Value,Text,Mutation> {
+    @Override
     public void map(Key row, Value data, Context context) throws IOException, InterruptedException {
       Mutation m = new Mutation(row.getRow());
       m.put(new Text("cf-HASHTYPE"), new Text("cq-MD5BASE64"), new Value(Base64.encodeBase64(MD5Hash.digest(data.toString()).getDigest())));
@@ -55,7 +56,7 @@ public class RowHash extends Configured 
   }
   
   private static class Opts extends ClientOnRequiredTable {
-    @Parameter(names="--column", required=true)
+    @Parameter(names = "--column", required = true)
     String column = null;
   }
   
@@ -73,7 +74,7 @@ public class RowHash extends Configured 
     Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
     Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
     if (cf.getLength() > 0)
-      AccumuloInputFormat.fetchColumns(job.getConfiguration(), Collections.singleton(new Pair<Text,Text>(cf, cq)));
+      AccumuloInputFormat.fetchColumns(job, Collections.singleton(new Pair<Text,Text>(cf, cq)));
     
     job.setMapperClass(HashDataMapper.class);
     job.setMapOutputKeyClass(Text.class);

Modified: accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
--- accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java (original)
+++ accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java Tue Jan 15 23:50:42 2013
@@ -46,9 +46,9 @@ import com.beust.jcommander.Parameter;
 public class TableToFile extends Configured implements Tool {
   
   static class Opts extends ClientOnRequiredTable {
-    @Parameter(names="--output", description="output directory", required=true)
+    @Parameter(names = "--output", description = "output directory", required = true)
     String output;
-    @Parameter(names="--columns", description="columns to extract, in cf:cq{,cf:cq,...} form")
+    @Parameter(names = "--columns", description = "columns to extract, in cf:cq{,cf:cq,...} form")
     String columns;
   }
   
@@ -56,6 +56,7 @@ public class TableToFile extends Configu
    * The Mapper class that given a row number, will generate the appropriate output line.
    */
   public static class TTFMapper extends Mapper<Key,Value,NullWritable,Text> {
+    @Override
     public void map(Key row, Value data, Context context) throws IOException, InterruptedException {
       final Key r = row;
       final Value v = data;
@@ -99,7 +100,7 @@ public class TableToFile extends Configu
         columnsToFetch.add(new Pair<Text,Text>(cf, cq));
     }
     if (!columnsToFetch.isEmpty())
-      AccumuloInputFormat.fetchColumns(job.getConfiguration(), columnsToFetch);
+      AccumuloInputFormat.fetchColumns(job, columnsToFetch);
     
     job.setMapperClass(TTFMapper.class);
     job.setMapOutputKeyClass(NullWritable.class);



Mime
View raw message