accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmil...@apache.org
Subject [accumulo] branch master updated: MR Improvements Closes #753 #751 (#765)
Date Mon, 26 Nov 2018 18:51:51 GMT
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new c8f5bf1  MR Improvements Closes #753 #751 (#765)
c8f5bf1 is described below

commit c8f5bf1670bb053b4111926179b458b4a58f884b
Author: Mike Miller <mmiller@apache.org>
AuthorDate: Mon Nov 26 13:51:46 2018 -0500

    MR Improvements Closes #753 #751 (#765)
    
    * Combine MR static method and builder. Closes #753
    * Remove BWConfig from map reduce API.  Closes #751
    * Replace setInfo methods with configure which begins fluent API
    * Replace *Info objects with *FormatBuilders
    * Migrate tests from core to use new API
    
    Co-authored-by: Keith Turner <kturner@apache.org>
    * Limit MR config to Job XOR JobConf (#13)
---
 .../hadoop/mapred/AccumuloFileOutputFormat.java    |  43 +--
 .../hadoop/mapred/AccumuloInputFormat.java         |  57 +---
 .../hadoop/mapred/AccumuloOutputFormat.java        |  29 +-
 .../hadoop/mapred/AccumuloRowInputFormat.java      |  58 +---
 .../hadoop/mapreduce/AccumuloFileOutputFormat.java |  51 +--
 .../hadoop/mapreduce/AccumuloInputFormat.java      |  59 +---
 .../hadoop/mapreduce/AccumuloOutputFormat.java     |  33 +-
 .../hadoop/mapreduce/AccumuloRowInputFormat.java   |  59 +---
 .../hadoop/mapreduce/FileOutputFormatBuilder.java  | 124 +++++++
 .../accumulo/hadoop/mapreduce/FileOutputInfo.java  | 192 -----------
 .../hadoop/mapreduce/InputFormatBuilder.java       | 265 +++++++++++++++
 .../accumulo/hadoop/mapreduce/InputInfo.java       | 364 ---------------------
 .../hadoop/mapreduce/OutputFormatBuilder.java      |  84 +++++
 .../accumulo/hadoop/mapreduce/OutputInfo.java      | 143 --------
 .../mapred/AccumuloOutputFormatImpl.java           |  31 +-
 .../mapreduce/AccumuloOutputFormatImpl.java        |  31 +-
 .../mapreduce/FileOutputFormatBuilderImpl.java     | 155 +++++++++
 .../hadoopImpl/mapreduce/FileOutputInfoImpl.java   | 159 ---------
 .../mapreduce/InputFormatBuilderImpl.java          | 247 ++++++++++++++
 .../hadoopImpl/mapreduce/InputInfoImpl.java        | 267 ---------------
 .../mapreduce/OutputFormatBuilderImpl.java         |  97 ++++++
 .../hadoopImpl/mapreduce/OutputInfoImpl.java       | 115 -------
 .../lib/MapReduceClientOnDefaultTable.java         |   9 +-
 .../lib/MapReduceClientOnRequiredTable.java        |  10 +-
 .../mapreduce/lib/OutputConfigurator.java          |  86 ++---
 .../its/mapred/AccumuloFileOutputFormatIT.java     | 227 +++++++++++++
 .../hadoop/its/mapred/AccumuloInputFormatIT.java   | 244 ++++++++++++++
 .../hadoop/its/mapred/AccumuloOutputFormatIT.java  | 229 +++++++++++++
 .../its/mapred/AccumuloRowInputFormatIT.java       |   5 +-
 .../accumulo/hadoop/its/mapred/TokenFileIT.java    | 181 ++++++++++
 .../its/mapreduce/AccumuloFileOutputFormatIT.java  | 236 +++++++++++++
 ...putFormatIT.java => AccumuloInputFormatIT.java} |  59 ++--
 .../its/mapreduce/AccumuloOutputFormatIT.java      | 152 +++++++++
 .../its/mapreduce/AccumuloRowInputFormatIT.java    |   5 +-
 .../accumulo/hadoop/its/mapreduce/RowHashIT.java   | 174 ++++++++++
 .../accumulo/hadoop/its/mapreduce/TokenFileIT.java | 174 ++++++++++
 .../mapred/AccumuloFileOutputFormatTest.java       |  14 +-
 .../hadoop/mapred/AccumuloInputFormatTest.java     |  36 +-
 .../hadoop/mapred/AccumuloOutputFormatTest.java    |  27 +-
 .../mapreduce/AccumuloFileOutputFormatTest.java    |  13 +-
 .../hadoop/mapreduce/AccumuloInputFormatTest.java  |  33 +-
 .../hadoop/mapreduce/AccumuloOutputFormatTest.java |  26 +-
 .../test/functional/ConfigurableMacBase.java       |   6 +
 43 files changed, 2815 insertions(+), 1794 deletions(-)

diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
index d44219d..ff531b9 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
@@ -16,24 +16,16 @@
  */
 package org.apache.accumulo.hadoop.mapred;
 
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setCompressionType;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setDataBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setFileBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setIndexBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setReplication;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSampler;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSummarizers;
-
 import java.io.IOException;
 
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
+import org.apache.accumulo.hadoop.mapreduce.FileOutputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputFormatBuilderImpl;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
 import org.apache.hadoop.conf.Configuration;
@@ -46,18 +38,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 
 /**
- * This class allows MapReduce jobs to write output in the Accumulo data file format.<br>
- * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important
- * requirement of Accumulo data files.
- *
- * <p>
- * The output path to be created must be specified via {@link #setInfo(JobConf, FileOutputInfo)}
- * using {@link FileOutputInfo#builder()}.outputPath(path). For all available options see
- * {@link FileOutputInfo#builder()}
- * <p>
- * Methods inherited from {@link FileOutputFormat} are not supported and may be ignored or cause
- * failures. Using other Hadoop configuration options that affect the behavior of the underlying
- * files directly in the Job's configuration may work, but are not directly supported at this time.
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat
  *
  * @since 2.0
  */
@@ -100,22 +81,8 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(JobConf job, FileOutputInfo info) {
-    setOutputPath(job, info.getOutputPath());
-    if (info.getCompressionType().isPresent())
-      setCompressionType(job, info.getCompressionType().get());
-    if (info.getDataBlockSize().isPresent())
-      setDataBlockSize(job, info.getDataBlockSize().get());
-    if (info.getFileBlockSize().isPresent())
-      setFileBlockSize(job, info.getFileBlockSize().get());
-    if (info.getIndexBlockSize().isPresent())
-      setIndexBlockSize(job, info.getIndexBlockSize().get());
-    if (info.getReplication().isPresent())
-      setReplication(job, info.getReplication().get());
-    if (info.getSampler().isPresent())
-      setSampler(job, info.getSampler().get());
-    if (info.getSummarizers().size() > 0)
-      setSummarizers(job, info.getSummarizers().toArray(new SummarizerConfiguration[0]));
+  public static FileOutputFormatBuilder.PathParams<JobConf> configure() {
+    return new FileOutputFormatBuilderImpl<JobConf>();
   }
 
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
index a344a4b..92df89d 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
@@ -16,29 +16,16 @@
  */
 package org.apache.accumulo.hadoop.mapred;
 
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation;
-
 import java.io.IOException;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
 import org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat;
 import org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.RecordReaderBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -48,17 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This
- * {@link org.apache.hadoop.mapred.InputFormat} provides keys and values of type {@link Key} and
- * {@link Value} to the Map function.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloInputFormat#setInfo(JobConf, InputInfo)}
- * </ul>
- *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat
  *
  * @since 2.0
  */
@@ -115,28 +92,10 @@ public class AccumuloInputFormat implements InputFormat<Key,Value> {
     return recordReader;
   }
 
-  public static void setInfo(JobConf job, InputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    setScanAuthorizations(job, info.getScanAuths());
-    setInputTableName(job, info.getTableName());
-
-    // all optional values
-    if (info.getContext().isPresent())
-      setClassLoaderContext(job, info.getContext().get());
-    if (info.getRanges().size() > 0)
-      setRanges(job, info.getRanges());
-    if (info.getIterators().size() > 0)
-      InputConfigurator.writeIteratorsToConf(CLASS, job, info.getIterators());
-    if (info.getFetchColumns().size() > 0)
-      InputConfigurator.fetchColumns(CLASS, job, info.getFetchColumns());
-    if (info.getSamplerConfig().isPresent())
-      setSamplerConfiguration(job, info.getSamplerConfig().get());
-    if (info.getExecutionHints().size() > 0)
-      setExecutionHints(job, info.getExecutionHints());
-    setAutoAdjustRanges(job, info.isAutoAdjustRanges());
-    setScanIsolation(job, info.isScanIsolation());
-    setLocalIterators(job, info.isLocalIterators());
-    setOfflineTableScan(job, info.isOfflineScan());
-    setBatchScan(job, info.isBatchScan());
+  /**
+   * Sets all the information required for this map reduce job.
+   */
+  public static InputFormatBuilder.ClientParams<JobConf> configure() {
+    return new InputFormatBuilderImpl<JobConf>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
index dca05a4..47864fa 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
@@ -17,11 +17,6 @@
 package org.apache.accumulo.hadoop.mapred;
 
 import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.getClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setBatchWriterOptions;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setCreateTables;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setDefaultTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setSimulationMode;
 
 import java.io.IOException;
 
@@ -32,8 +27,9 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
+import org.apache.accumulo.hadoop.mapreduce.OutputFormatBuilder;
 import org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.OutputFormatBuilderImpl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -42,15 +38,7 @@ import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.util.Progressable;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat}
- * accepts keys and values of type {@link Text} (for a table name) and {@link Mutation} from the Map
- * and Reduce functions.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloOutputFormat#setInfo(JobConf, OutputInfo)}
- * </ul>
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat
  *
  * @since 2.0
  */
@@ -81,14 +69,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
     }
   }
 
-  public static void setInfo(JobConf job, OutputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    if (info.getBatchWriterOptions().isPresent())
-      setBatchWriterOptions(job, info.getBatchWriterOptions().get());
-    if (info.getDefaultTableName().isPresent())
-      setDefaultTableName(job, info.getDefaultTableName().get());
-    setCreateTables(job, info.isCreateTables());
-    setSimulationMode(job, info.isSimulationMode());
+  public static OutputFormatBuilder.ClientParams<JobConf> configure() {
+    return new OutputFormatBuilderImpl<JobConf>();
   }
-
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
index e823d8a..c1c6cbe 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
@@ -16,20 +16,6 @@
  */
 package org.apache.accumulo.hadoop.mapred;
 
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.fetchColumns;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation;
-
 import java.io.IOException;
 import java.util.Map.Entry;
 
@@ -37,10 +23,10 @@ import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.PeekingIterator;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
 import org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat;
 import org.apache.accumulo.hadoopImpl.mapred.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -49,22 +35,12 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
- * provides row names as {@link Text} as keys, and a corresponding {@link PeekingIterator} as a
- * value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map
- * function.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloRowInputFormat#setInfo(JobConf, InputInfo)}
- * </ul>
- *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloRowInputFormat
  *
  * @since 2.0
  */
 public class AccumuloRowInputFormat implements InputFormat<Text,PeekingIterator<Entry<Key,Value>>> {
+  private static final Class<AccumuloRowInputFormat> CLASS = AccumuloRowInputFormat.class;
 
   /**
    * Gets the splits of the tables that have been set on the job by reading the metadata table for
@@ -123,29 +99,7 @@ public class AccumuloRowInputFormat implements InputFormat<Text,PeekingIterator<
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(JobConf job, InputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    setScanAuthorizations(job, info.getScanAuths());
-    setInputTableName(job, info.getTableName());
-
-    // all optional values
-    if (info.getContext().isPresent())
-      setClassLoaderContext(job, info.getContext().get());
-    if (info.getRanges().size() > 0)
-      setRanges(job, info.getRanges());
-    if (info.getIterators().size() > 0)
-      InputConfigurator.writeIteratorsToConf(AccumuloRowInputFormat.class, job,
-          info.getIterators());
-    if (info.getFetchColumns().size() > 0)
-      fetchColumns(job, info.getFetchColumns());
-    if (info.getSamplerConfig().isPresent())
-      setSamplerConfiguration(job, info.getSamplerConfig().get());
-    if (info.getExecutionHints().size() > 0)
-      setExecutionHints(job, info.getExecutionHints());
-    setAutoAdjustRanges(job, info.isAutoAdjustRanges());
-    setScanIsolation(job, info.isScanIsolation());
-    setLocalIterators(job, info.isLocalIterators());
-    setOfflineTableScan(job, info.isOfflineScan());
-    setBatchScan(job, info.isBatchScan());
+  public static InputFormatBuilder.ClientParams<JobConf> configure() {
+    return new InputFormatBuilderImpl<JobConf>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
index 26f559e..2f869e9 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
@@ -16,23 +16,15 @@
  */
 package org.apache.accumulo.hadoop.mapreduce;
 
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setCompressionType;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setDataBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setFileBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setIndexBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setReplication;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSampler;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSummarizers;
-
 import java.io.IOException;
 
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputFormatBuilderImpl;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
 import org.apache.hadoop.conf.Configuration;
@@ -45,16 +37,21 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 /**
  * This class allows MapReduce jobs to write output in the Accumulo data file format.<br>
  * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important
- * requirement of Accumulo data files.
+ * requirement of Accumulo data files. The output path to be created must be specified via
+ * {@link #configure()}, which uses a fluent API. For Example:
+ *
+ * <pre>
+ * AccumuloFileOutputFormat.configure()
+ *      .outputPath(path)
+ *      .fileBlockSize(b)
+ *      .compression(type)
+ *      .summarizers(sc1, sc2).store(job));
+ * </pre>
  *
- * <p>
- * The output path to be created must be specified via {@link #setInfo(Job, FileOutputInfo)} using
- * {@link FileOutputInfo#builder()}.outputPath(path). For all available options see
- * {@link FileOutputInfo#builder()}
- * <p>
- * Methods inherited from {@link FileOutputFormat} are not supported and may be ignored or cause
- * failures. Using other Hadoop configuration options that affect the behavior of the underlying
- * files directly in the Job's configuration may work, but are not directly supported at this time.
+ * For all available options see {@link FileOutputFormatBuilder}. Methods inherited from
+ * {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using other
+ * Hadoop configuration options that affect the behavior of the underlying files directly in the
+ * Job's configuration may work, but are not directly supported at this time.
  *
  * @since 2.0
  */
@@ -95,22 +92,8 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(Job job, FileOutputInfo info) {
-    setOutputPath(job, info.getOutputPath());
-    if (info.getCompressionType().isPresent())
-      setCompressionType(job, info.getCompressionType().get());
-    if (info.getDataBlockSize().isPresent())
-      setDataBlockSize(job, info.getDataBlockSize().get());
-    if (info.getFileBlockSize().isPresent())
-      setFileBlockSize(job, info.getFileBlockSize().get());
-    if (info.getIndexBlockSize().isPresent())
-      setIndexBlockSize(job, info.getIndexBlockSize().get());
-    if (info.getReplication().isPresent())
-      setReplication(job, info.getReplication().get());
-    if (info.getSampler().isPresent())
-      setSampler(job, info.getSampler().get());
-    if (info.getSummarizers().size() > 0)
-      setSummarizers(job, info.getSummarizers().toArray(new SummarizerConfiguration[0]));
+  public static FileOutputFormatBuilder.PathParams<Job> configure() {
+    return new FileOutputFormatBuilderImpl<Job>();
   }
 
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
index fa64a5a..3c391a8 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
@@ -16,19 +16,6 @@
  */
 package org.apache.accumulo.hadoop.mapreduce;
 
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setScanIsolation;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map.Entry;
@@ -38,7 +25,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
 import org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat;
 import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -50,15 +37,20 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
- * provides keys and values of type {@link Key} and {@link Value} to the Map function.
- *
- * The user must specify the following via static configurator method:
+ * provides keys and values of type {@link Key} and {@link Value} to the Map function. Configure the
+ * job using the {@link #configure()} method, which provides a fluent API. For Example:
  *
- * <ul>
- * <li>{@link AccumuloInputFormat#setInfo(Job, InputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloInputFormat.configure().clientInfo(info).table(name).auths(auths) // required
+ *     .addIterator(iter1).ranges(ranges).fetchColumns(columns).executionHints(hints)
+ *     .samplerConfiguration(sampleConf).disableAutoAdjustRanges() // enabled by default
+ *     .scanIsolation() // not available with batchScan()
+ *     .offlineScan() // not available with batchScan()
+ *     .store(job);
+ * </pre>
  *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * For descriptions of all options see
+ * {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions}
  *
  * @since 2.0
  */
@@ -103,28 +95,7 @@ public class AccumuloInputFormat extends InputFormat<Key,Value> {
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(Job job, InputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    setScanAuthorizations(job, info.getScanAuths());
-    setInputTableName(job, info.getTableName());
-
-    // all optional values
-    if (info.getContext().isPresent())
-      setClassLoaderContext(job, info.getContext().get());
-    if (info.getRanges().size() > 0)
-      setRanges(job, info.getRanges());
-    if (info.getIterators().size() > 0)
-      InputConfigurator.writeIteratorsToConf(CLASS, job.getConfiguration(), info.getIterators());
-    if (info.getFetchColumns().size() > 0)
-      InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), info.getFetchColumns());
-    if (info.getSamplerConfig().isPresent())
-      setSamplerConfiguration(job, info.getSamplerConfig().get());
-    if (info.getExecutionHints().size() > 0)
-      setExecutionHints(job, info.getExecutionHints());
-    setAutoAdjustRanges(job, info.isAutoAdjustRanges());
-    setScanIsolation(job, info.isScanIsolation());
-    setLocalIterators(job, info.isLocalIterators());
-    setOfflineTableScan(job, info.isOfflineScan());
-    setBatchScan(job, info.isBatchScan());
+  public static InputFormatBuilder.ClientParams<Job> configure() {
+    return new InputFormatBuilderImpl<>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
index a285988..565de0e 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
@@ -17,11 +17,6 @@
 package org.apache.accumulo.hadoop.mapreduce;
 
 import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.getClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setBatchWriterOptions;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setCreateTables;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setDefaultTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setSimulationMode;
 
 import java.io.IOException;
 
@@ -33,6 +28,7 @@ import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.OutputFormatBuilderImpl;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -45,13 +41,15 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 /**
  * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat}
  * accepts keys and values of type {@link Text} (for a table name) and {@link Mutation} from the Map
- * and Reduce functions.
+ * and Reduce functions. Configured with fluent API using {@link AccumuloOutputFormat#configure()}.
+ * Here is an example with all possible options:
  *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloOutputFormat#setInfo(Job, OutputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloOutputFormat.configure().clientInfo(clientInfo).batchWriterOptions(bwConfig)
+ *     .defaultTable(name).createTables() // disabled by default
+ *     .simulationMode() // disabled by default
+ *     .store(job);
+ * </pre>
  *
  * @since 2.0
  */
@@ -88,14 +86,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
     }
   }
 
-  public static void setInfo(Job job, OutputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    if (info.getBatchWriterOptions().isPresent())
-      setBatchWriterOptions(job, info.getBatchWriterOptions().get());
-    if (info.getDefaultTableName().isPresent())
-      setDefaultTableName(job, info.getDefaultTableName().get());
-    setCreateTables(job, info.isCreateTables());
-    setSimulationMode(job, info.isSimulationMode());
+  /**
+   * Sets all the information required for this map reduce job.
+   */
+  public static OutputFormatBuilder.ClientParams<Job> configure() {
+    return new OutputFormatBuilderImpl<Job>();
   }
 
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
index d7bba69..899eb28 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
@@ -16,19 +16,6 @@
  */
 package org.apache.accumulo.hadoop.mapreduce;
 
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setScanIsolation;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map.Entry;
@@ -39,7 +26,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat;
 import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,15 +39,20 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
  * provides row names as {@link Text} as keys, and a corresponding {@link PeekingIterator} as a
  * value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map
- * function.
- *
- * The user must specify the following via static configurator method:
+ * function. Configure the job using the {@link #configure()} method, which provides a fluent API.
+ * For Example:
  *
- * <ul>
- * <li>{@link AccumuloRowInputFormat#setInfo(Job, InputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloRowInputFormat.configure().clientInfo(info).table(name).auths(auths) // required
+ *     .addIterator(iter1).ranges(ranges).fetchColumns(columns).executionHints(hints)
+ *     .samplerConfiguration(sampleConf).disableAutoAdjustRanges() // enabled by default
+ *     .scanIsolation() // not available with batchScan()
+ *     .offlineScan() // not available with batchScan()
+ *     .store(job);
+ * </pre>
  *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * For descriptions of all options see
+ * {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions}
  *
  * @since 2.0
  */
@@ -111,28 +103,7 @@ public class AccumuloRowInputFormat extends InputFormat<Text,PeekingIterator<Ent
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(Job job, InputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    setScanAuthorizations(job, info.getScanAuths());
-    setInputTableName(job, info.getTableName());
-
-    // all optional values
-    if (info.getContext().isPresent())
-      setClassLoaderContext(job, info.getContext().get());
-    if (info.getRanges().size() > 0)
-      setRanges(job, info.getRanges());
-    if (info.getIterators().size() > 0)
-      InputConfigurator.writeIteratorsToConf(CLASS, job.getConfiguration(), info.getIterators());
-    if (info.getFetchColumns().size() > 0)
-      InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), info.getFetchColumns());
-    if (info.getSamplerConfig().isPresent())
-      setSamplerConfiguration(job, info.getSamplerConfig().get());
-    if (info.getExecutionHints().size() > 0)
-      setExecutionHints(job, info.getExecutionHints());
-    setAutoAdjustRanges(job, info.isAutoAdjustRanges());
-    setScanIsolation(job, info.isScanIsolation());
-    setLocalIterators(job, info.isLocalIterators());
-    setOfflineTableScan(job, info.isOfflineScan());
-    setBatchScan(job, info.isBatchScan());
+  public static InputFormatBuilder.ClientParams<Job> configure() {
+    return new InputFormatBuilderImpl<Job>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java
new file mode 100644
index 0000000..d4e4fc8
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloFileOutputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface FileOutputFormatBuilder {
+  /**
+   * Required params for builder
+   *
+   * @since 2.0
+   */
+  interface PathParams<T> {
+    /**
+     * Set the Path of the output directory for the map-reduce job.
+     */
+    OutputOptions<T> outputPath(Path path);
+  }
+
+  /**
+   * Options for builder
+   *
+   * @since 2.0
+   */
+  interface OutputOptions<T> {
+    /**
+     * Sets the compression type to use for data blocks, overriding the default. Specifying a
+     * compression may require additional libraries to be available to your Job.
+     *
+     * @param compressionType
+     *          one of "none", "gz", "lzo", or "snappy"
+     */
+    OutputOptions<T> compression(String compressionType);
+
+    /**
+     * Sets the size for data blocks within each file.<br>
+     * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed
+     * as a group.
+     *
+     * <p>
+     * Making this value smaller may increase seek performance, but at the cost of increasing the
+     * size of the indexes (which can also affect seek performance).
+     *
+     * @param dataBlockSize
+     *          the block size, in bytes
+     */
+    OutputOptions<T> dataBlockSize(long dataBlockSize);
+
+    /**
+     * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by
+     * the underlying file system.
+     *
+     * @param fileBlockSize
+     *          the block size, in bytes
+     */
+    OutputOptions<T> fileBlockSize(long fileBlockSize);
+
+    /**
+     * Sets the size for index blocks within each file; smaller blocks means a deeper index
+     * hierarchy within the file, while larger blocks mean a more shallow index hierarchy within the
+     * file. This can affect the performance of queries.
+     *
+     * @param indexBlockSize
+     *          the block size, in bytes
+     */
+    OutputOptions<T> indexBlockSize(long indexBlockSize);
+
+    /**
+     * Sets the file system replication factor for the resulting file, overriding the file system
+     * default.
+     *
+     * @param replication
+     *          the number of replicas for produced files
+     */
+    OutputOptions<T> replication(int replication);
+
+    /**
+     * Specify a sampler to be used when writing out data. This will result in the output file
+     * having sample data.
+     *
+     * @param samplerConfig
+     *          The configuration for creating sample data in the output file.
+     */
+    OutputOptions<T> sampler(SamplerConfiguration samplerConfig);
+
+    /**
+     * Specifies a list of summarizer configurations to create summary data in the output file. Each
+     * Key Value written will be passed to the configured
+     * {@link org.apache.accumulo.core.client.summary.Summarizer}'s.
+     *
+     * @param summarizerConfigs
+     *          summarizer configurations
+     */
+    OutputOptions<T> summarizers(SummarizerConfiguration... summarizerConfigs);
+
+    /**
+     * Finish configuring, verify and serialize options into the Job or JobConf
+     */
+    void store(T job);
+  }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java
deleted file mode 100644
index 70b6043..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.util.Collection;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
-import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputInfoImpl;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to
- * {@link AccumuloFileOutputFormat#setInfo(Job, FileOutputInfo)}. It uses a fluent API like so:
- *
- * <pre>
- * FileOutputInfo.builder()
- *      .outputPath(path)
- *      .fileBlockSize(b)
- *      .compressionType(type)
- *      .summarizers(sc1, sc2).build());
- * </pre>
- *
- * @since 2.0
- */
-public interface FileOutputInfo {
-
-  /**
-   * @return the output path set using FileOutputInfo.builder()...outputPath(path)
-   */
-  public Path getOutputPath();
-
-  /**
-   * @return the compression if set using FileOutputInfo.builder()...compressionType(type)
-   */
-  public Optional<String> getCompressionType();
-
-  /**
-   * @return the data block size if set using FileOutputInfo.builder()...dataBlockSize(size)
-   */
-  public Optional<Long> getDataBlockSize();
-
-  /**
-   * @return the file block size if set using FileOutputInfo.builder()...fileBlockSize(size)
-   */
-  public Optional<Long> getFileBlockSize();
-
-  /**
-   * @return the index block size if set using FileOutputInfo.builder()...indexBlockSize(size)
-   */
-  public Optional<Long> getIndexBlockSize();
-
-  /**
-   * @return the replication if set using FileOutputInfo.builder()...replication(num)
-   */
-  public Optional<Integer> getReplication();
-
-  /**
-   * @return the SamplerConfiguration if set using FileOutputInfo.builder()...sampler(conf)
-   */
-  public Optional<SamplerConfiguration> getSampler();
-
-  /**
-   * @return the summarizers set using FileOutputInfo.builder()...summarizers(conf1, conf2...)
-   */
-  public Collection<SummarizerConfiguration> getSummarizers();
-
-  /**
-   * @return builder for creating a {@link FileOutputInfo}
-   */
-  public static FileOutputInfoBuilder.PathParams builder() {
-    return new FileOutputInfoImpl.FileOutputInfoBuilderImpl();
-  }
-
-  /**
-   * Fluent API builder for FileOutputInfo
-   *
-   * @since 2.0
-   */
-  interface FileOutputInfoBuilder {
-
-    /**
-     * Required params for builder
-     *
-     * @since 2.0
-     */
-    interface PathParams {
-      /**
-       * Set the Path of the output directory for the map-reduce job.
-       */
-      OutputOptions outputPath(Path path);
-    }
-
-    /**
-     * Options for builder
-     *
-     * @since 2.0
-     */
-    interface OutputOptions {
-      /**
-       * Sets the compression type to use for data blocks, overriding the default. Specifying a
-       * compression may require additional libraries to be available to your Job.
-       *
-       * @param compressionType
-       *          one of "none", "gz", "lzo", or "snappy"
-       */
-      OutputOptions compressionType(String compressionType);
-
-      /**
-       * Sets the size for data blocks within each file.<br>
-       * Data blocks are a span of key/value pairs stored in the file that are compressed and
-       * indexed as a group.
-       *
-       * <p>
-       * Making this value smaller may increase seek performance, but at the cost of increasing the
-       * size of the indexes (which can also affect seek performance).
-       *
-       * @param dataBlockSize
-       *          the block size, in bytes
-       */
-      OutputOptions dataBlockSize(long dataBlockSize);
-
-      /**
-       * Sets the size for file blocks in the file system; file blocks are managed, and replicated,
-       * by the underlying file system.
-       *
-       * @param fileBlockSize
-       *          the block size, in bytes
-       */
-      OutputOptions fileBlockSize(long fileBlockSize);
-
-      /**
-       * Sets the size for index blocks within each file; smaller blocks means a deeper index
-       * hierarchy within the file, while larger blocks mean a more shallow index hierarchy within
-       * the file. This can affect the performance of queries.
-       *
-       * @param indexBlockSize
-       *          the block size, in bytes
-       */
-      OutputOptions indexBlockSize(long indexBlockSize);
-
-      /**
-       * Sets the file system replication factor for the resulting file, overriding the file system
-       * default.
-       *
-       * @param replication
-       *          the number of replicas for produced files
-       */
-      OutputOptions replication(int replication);
-
-      /**
-       * Specify a sampler to be used when writing out data. This will result in the output file
-       * having sample data.
-       *
-       * @param samplerConfig
-       *          The configuration for creating sample data in the output file.
-       */
-      OutputOptions sampler(SamplerConfiguration samplerConfig);
-
-      /**
-       * Specifies a list of summarizer configurations to create summary data in the output file.
-       * Each Key Value written will be passed to the configured
-       * {@link org.apache.accumulo.core.client.summary.Summarizer}'s.
-       *
-       * @param summarizerConfigs
-       *          summarizer configurations
-       */
-      OutputOptions summarizers(SummarizerConfiguration... summarizerConfigs);
-
-      /**
-       * @return newly created {@link FileOutputInfo}
-       */
-      FileOutputInfo build();
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
new file mode 100644
index 0000000..1751d9f
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloInputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface InputFormatBuilder {
+  /**
+   * Required params for builder
+   *
+   * @since 2.0
+   */
+  interface ClientParams<T> {
+    /**
+     * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
+     * param can be created using {@link ClientInfo#from(Properties)}
+     *
+     * @param clientInfo
+     *          Accumulo connection information
+     */
+    TableParams<T> clientInfo(ClientInfo clientInfo);
+  }
+
+  /**
+   * Required params for builder
+   *
+   * @since 2.0
+   */
+  interface TableParams<T> {
+    /**
+     * Sets the name of the input table, over which this job will scan.
+     *
+     * @param tableName
+     *          the table to use when the tablename is null in the write call
+     */
+    AuthsParams<T> table(String tableName);
+  }
+
+  /**
+   * Required params for builder
+   *
+   * @since 2.0
+   */
+  interface AuthsParams<T> {
+    /**
+     * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorizations.
+     * If none present use {@link Authorizations#EMPTY}
+     *
+     * @param auths
+     *          the user's authorizations
+     */
+    InputFormatOptions<T> auths(Authorizations auths);
+  }
+
+  /**
+   * Options for batch scan
+   *
+   * @since 2.0
+   */
+  interface BatchScanOptions<T> {
+    /**
+     * Finish configuring, verify and store options into the JobConf or Job
+     */
+    void store(T t);
+  }
+
+  /**
+   * Options for scan
+   *
+   * @since 2.0
+   */
+  interface ScanOptions<T> extends BatchScanOptions<T> {
+    /**
+     * @see InputFormatOptions#scanIsolation()
+     */
+    ScanOptions<T> scanIsolation();
+
+    /**
+     * @see InputFormatOptions#localIterators()
+     */
+    ScanOptions<T> localIterators();
+
+    /**
+     * @see InputFormatOptions#offlineScan()
+     */
+    ScanOptions<T> offlineScan();
+  }
+
+  /**
+   * Optional values to set using fluent API
+   *
+   * @since 2.0
+   */
+  interface InputFormatOptions<T> {
+    /**
+     * Sets the name of the classloader context on this scanner
+     *
+     * @param context
+     *          name of the classloader context
+     */
+    InputFormatOptions<T> classLoaderContext(String context);
+
+    /**
+     * Sets the input ranges to scan for the single input table associated with this job.
+     *
+     * @param ranges
+     *          the ranges that will be mapped over
+     * @see TableOperations#splitRangeByTablets(String, Range, int)
+     */
+    InputFormatOptions<T> ranges(Collection<Range> ranges);
+
+    /**
+     * Restricts the columns that will be mapped over for this job for the default input table.
+     *
+     * @param fetchColumns
+     *          a collection of IteratorSetting.Column objects corresponding to column family and
+     *          column qualifier. If the column qualifier is null, the entire column family is
+     *          selected. An empty set is the default and is equivalent to scanning all columns.
+     */
+    InputFormatOptions<T> fetchColumns(Collection<IteratorSetting.Column> fetchColumns);
+
+    /**
+     * Encode an iterator on the single input table for this job. It is safe to call this method
+     * multiple times. If an iterator is added with the same name, it will be overridden.
+     *
+     * @param cfg
+     *          the configuration of the iterator
+     */
+    InputFormatOptions<T> addIterator(IteratorSetting cfg);
+
+    /**
+     * Set these execution hints on scanners created for input splits. See
+     * {@link ScannerBase#setExecutionHints(java.util.Map)}
+     */
+    InputFormatOptions<T> executionHints(Map<String,String> hints);
+
+    /**
+     * Causes input format to read sample data. If sample data was created using a different
+     * configuration or a tables sampler configuration changes while reading data, then the input
+     * format will throw an error.
+     *
+     * @param samplerConfig
+     *          The sampler configuration that sample must have been created with inorder for
+     *          reading sample data to succeed.
+     *
+     * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
+     */
+    InputFormatOptions<T> samplerConfiguration(SamplerConfiguration samplerConfig);
+
+    /**
+     * Disables the automatic adjustment of ranges for this job. This feature merges overlapping
+     * ranges, then splits them to align with tablet boundaries. Disabling this feature will cause
+     * exactly one Map task to be created for each specified range. Disabling has no effect for
+     * batch scans at it will always automatically adjust ranges.
+     * <p>
+     * By default, this feature is <b>enabled</b>.
+     *
+     * @see #ranges(Collection)
+     */
+    InputFormatOptions<T> disableAutoAdjustRanges();
+
+    /**
+     * Enables the use of the {@link IsolatedScanner} in this job.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    ScanOptions<T> scanIsolation();
+
+    /**
+     * Enables the use of the {@link ClientSideIteratorScanner} in this job. This feature will cause
+     * the iterator stack to be constructed within the Map task, rather than within the Accumulo
+     * TServer. To use this feature, all classes needed for those iterators must be available on the
+     * classpath for the task.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    ScanOptions<T> localIterators();
+
+    /**
+     * Enable reading offline tables. By default, this feature is disabled and only online tables
+     * are scanned. This will make the map reduce job directly read the table's files. If the table
+     * is not offline, then the job will fail. If the table comes online during the map reduce job,
+     * it is likely that the job will fail.
+     * <p>
+     * To use this option, the map reduce user will need access to read the Accumulo directory in
+     * HDFS.
+     * <p>
+     * Reading the offline table will create the scan time iterator stack in the map process. So any
+     * iterators that are configured for the table will need to be on the mapper's classpath.
+     * <p>
+     * One way to use this feature is to clone a table, take the clone offline, and use the clone as
+     * the input table for a map reduce job. If you plan to map reduce over the data many times, it
+     * may be better to the compact the table, clone it, take it offline, and use the clone for all
+     * map reduce jobs. The reason to do this is that compaction will reduce each tablet in the
+     * table to one file, and it is faster to read from one file.
+     * <p>
+     * There are two possible advantages to reading a tables file directly out of HDFS. First, you
+     * may see better read performance. Second, it will support speculative execution better. When
+     * reading an online table speculative execution can put more load on an already slow tablet
+     * server.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    ScanOptions<T> offlineScan();
+
+    /**
+     * Enables the use of the {@link org.apache.accumulo.core.client.BatchScanner} in this job.
+     * Using this feature will group Ranges by their source tablet, producing an InputSplit per
+     * tablet rather than per Range. This batching helps to reduce overhead when querying a large
+     * number of small ranges. (ex: when doing quad-tree decomposition for spatial queries)
+     * <p>
+     * In order to achieve good locality of InputSplits this option always clips the input Ranges to
+     * tablet boundaries. This may result in one input Range contributing to several InputSplits.
+     * <p>
+     * Note: calls to {@link #disableAutoAdjustRanges()} is ignored when BatchScan is enabled.
+     * <p>
+     * This configuration is incompatible with:
+     * <ul>
+     * <li>{@link #offlineScan()}</li>
+     * <li>{@link #localIterators()}</li>
+     * <li>{@link #scanIsolation()}</li>
+     * </ul>
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    BatchScanOptions<T> batchScan();
+
+    /**
+     * Finish configuring, verify and serialize options into the JobConf or Job
+     */
+    void store(T j);
+  }
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java
deleted file mode 100644
index d443ce4..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.ClientSideIteratorScanner;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.ScannerBase;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoopImpl.mapreduce.InputInfoImpl;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to the
- * different Input Format types see {@link AccumuloInputFormat#setInfo(Job, InputInfo)}. It uses a
- * fluent API:
- *
- * <pre>
- * InputInfo.builder().clientInfo(info).table(name).scanAuths(auths).build();
- *
- * InputInfo.builder().clientProperties(props).table(name).scanAuths(auths).addIterator(cfg)
- *     .executionHints(hints).build();
- * </pre>
- *
- * @since 2.0
- */
-public interface InputInfo {
-  /**
-   * @return the table name set using InputInfo.builder()...table(name)
-   */
-  String getTableName();
-
-  /**
-   * @return the client info set using InputInfo.builder().clientInfo(info)
-   */
-  ClientInfo getClientInfo();
-
-  /**
-   * @return the scan authorizations set using InputInfo.builder()...scanAuths(auths)
-   */
-  Authorizations getScanAuths();
-
-  /**
-   * @return the context set using InputInfo.builder()...classLoaderContext(context)
-   */
-  Optional<String> getContext();
-
-  /**
-   * @return the Ranges set using InputInfo.builder()...ranges(ranges)
-   */
-  Collection<Range> getRanges();
-
-  /**
-   * @return the ColumnFamily,ColumnQualifier Pairs set using
-   *         InputInfo.builder()...fetchColumns(cfcqPairs)
-   */
-  Collection<IteratorSetting.Column> getFetchColumns();
-
-  /**
-   * @return the collection of IteratorSettings set using InputInfo.builder()...addIterator(cfg)
-   */
-  Collection<IteratorSetting> getIterators();
-
-  /**
-   * @return the SamplerConfiguration set using InputInfo.builder()...samplerConfiguration(cfg)
-   */
-  Optional<SamplerConfiguration> getSamplerConfig();
-
-  /**
-   * @return the Execution Hints set using InputInfo.builder()...executionHints(hints)
-   */
-  Map<String,String> getExecutionHints();
-
-  /**
-   * @return boolean if auto adjusting ranges or not
-   */
-  boolean isAutoAdjustRanges();
-
-  /**
-   * @return boolean if using scan isolation or not
-   */
-  boolean isScanIsolation();
-
-  /**
-   * @return boolean if using local iterators or not
-   */
-  boolean isLocalIterators();
-
-  /**
-   * @return boolean if using offline scan or not
-   */
-  boolean isOfflineScan();
-
-  /**
-   * @return boolean if using batch scanner or not
-   */
-  boolean isBatchScan();
-
-  /**
-   * Builder starting point for map reduce input format information.
-   */
-  static InputInfoBuilder.ClientParams builder() {
-    return new InputInfoImpl.InputInfoBuilderImpl();
-  }
-
-  /**
-   * Required build values to be set.
-   *
-   * @since 2.0
-   */
-  interface InputInfoBuilder {
-    /**
-     * Required params for builder
-     *
-     * @since 2.0
-     */
-    interface ClientParams {
-      /**
-       * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
-       * param can be created using {@link ClientInfo#from(Path)} or
-       * {@link ClientInfo#from(Properties)}
-       *
-       * @param clientInfo
-       *          Accumulo connection information
-       */
-      TableParams clientInfo(ClientInfo clientInfo);
-    }
-
-    /**
-     * Required params for builder
-     *
-     * @since 2.0
-     */
-    interface TableParams {
-      /**
-       * Sets the name of the input table, over which this job will scan.
-       *
-       * @param tableName
-       *          the table to use when the tablename is null in the write call
-       */
-      AuthsParams table(String tableName);
-    }
-
-    /**
-     * Required params for builder
-     *
-     * @since 2.0
-     */
-    interface AuthsParams {
-      /**
-       * Sets the {@link Authorizations} used to scan. Must be a subset of the user's
-       * authorizations. If none present use {@link Authorizations#EMPTY}
-       *
-       * @param auths
-       *          the user's authorizations
-       */
-      InputFormatOptions scanAuths(Authorizations auths);
-    }
-
-    /**
-     * Options for batch scan
-     *
-     * @since 2.0
-     */
-    interface BatchScanOptions {
-      /**
-       * @return newly created {@link InputInfo}
-       */
-      InputInfo build();
-    }
-
-    /**
-     * Options for scan
-     *
-     * @since 2.0
-     */
-    interface ScanOptions extends BatchScanOptions {
-      /**
-       * @see InputFormatOptions#scanIsolation()
-       */
-      ScanOptions scanIsolation();
-
-      /**
-       * @see InputFormatOptions#localIterators()
-       */
-      ScanOptions localIterators();
-
-      /**
-       * @see InputFormatOptions#offlineScan()
-       */
-      ScanOptions offlineScan();
-    }
-
-    /**
-     * Optional values to set using fluent API
-     *
-     * @since 2.0
-     */
-    interface InputFormatOptions {
-      /**
-       * Sets the name of the classloader context on this scanner
-       *
-       * @param context
-       *          name of the classloader context
-       */
-      InputFormatOptions classLoaderContext(String context);
-
-      /**
-       * Sets the input ranges to scan for the single input table associated with this job.
-       *
-       * @param ranges
-       *          the ranges that will be mapped over
-       * @see TableOperations#splitRangeByTablets(String, Range, int)
-       */
-      InputFormatOptions ranges(Collection<Range> ranges);
-
-      /**
-       * Restricts the columns that will be mapped over for this job for the default input table.
-       *
-       * @param fetchColumns
-       *          a collection of IteratorSetting.Column objects corresponding to column family and
-       *          column qualifier. If the column qualifier is null, the entire column family is
-       *          selected. An empty set is the default and is equivalent to scanning all columns.
-       */
-      InputFormatOptions fetchColumns(Collection<IteratorSetting.Column> fetchColumns);
-
-      /**
-       * Encode an iterator on the single input table for this job. It is safe to call this method
-       * multiple times. If an iterator is added with the same name, it will be overridden.
-       *
-       * @param cfg
-       *          the configuration of the iterator
-       */
-      InputFormatOptions addIterator(IteratorSetting cfg);
-
-      /**
-       * Set these execution hints on scanners created for input splits. See
-       * {@link ScannerBase#setExecutionHints(java.util.Map)}
-       */
-      InputFormatOptions executionHints(Map<String,String> hints);
-
-      /**
-       * Causes input format to read sample data. If sample data was created using a different
-       * configuration or a tables sampler configuration changes while reading data, then the input
-       * format will throw an error.
-       *
-       * @param samplerConfig
-       *          The sampler configuration that sample must have been created with inorder for
-       *          reading sample data to succeed.
-       *
-       * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
-       */
-      InputFormatOptions samplerConfiguration(SamplerConfiguration samplerConfig);
-
-      /**
-       * Disables the automatic adjustment of ranges for this job. This feature merges overlapping
-       * ranges, then splits them to align with tablet boundaries. Disabling this feature will cause
-       * exactly one Map task to be created for each specified range. Disabling has no effect for
-       * batch scans at it will always automatically adjust ranges.
-       * <p>
-       * By default, this feature is <b>enabled</b>.
-       *
-       * @see #ranges(Collection)
-       */
-      InputFormatOptions disableAutoAdjustRanges();
-
-      /**
-       * Enables the use of the {@link IsolatedScanner} in this job.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      ScanOptions scanIsolation();
-
-      /**
-       * Enables the use of the {@link ClientSideIteratorScanner} in this job. This feature will
-       * cause the iterator stack to be constructed within the Map task, rather than within the
-       * Accumulo TServer. To use this feature, all classes needed for those iterators must be
-       * available on the classpath for the task.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      ScanOptions localIterators();
-
-      /**
-       * Enable reading offline tables. By default, this feature is disabled and only online tables
-       * are scanned. This will make the map reduce job directly read the table's files. If the
-       * table is not offline, then the job will fail. If the table comes online during the map
-       * reduce job, it is likely that the job will fail.
-       * <p>
-       * To use this option, the map reduce user will need access to read the Accumulo directory in
-       * HDFS.
-       * <p>
-       * Reading the offline table will create the scan time iterator stack in the map process. So
-       * any iterators that are configured for the table will need to be on the mapper's classpath.
-       * <p>
-       * One way to use this feature is to clone a table, take the clone offline, and use the clone
-       * as the input table for a map reduce job. If you plan to map reduce over the data many
-       * times, it may be better to the compact the table, clone it, take it offline, and use the
-       * clone for all map reduce jobs. The reason to do this is that compaction will reduce each
-       * tablet in the table to one file, and it is faster to read from one file.
-       * <p>
-       * There are two possible advantages to reading a tables file directly out of HDFS. First, you
-       * may see better read performance. Second, it will support speculative execution better. When
-       * reading an online table speculative execution can put more load on an already slow tablet
-       * server.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      ScanOptions offlineScan();
-
-      /**
-       * Enables the use of the {@link org.apache.accumulo.core.client.BatchScanner} in this job.
-       * Using this feature will group Ranges by their source tablet, producing an InputSplit per
-       * tablet rather than per Range. This batching helps to reduce overhead when querying a large
-       * number of small ranges. (ex: when doing quad-tree decomposition for spatial queries)
-       * <p>
-       * In order to achieve good locality of InputSplits this option always clips the input Ranges
-       * to tablet boundaries. This may result in one input Range contributing to several
-       * InputSplits.
-       * <p>
-       * Note: calls to {@link #disableAutoAdjustRanges()} is ignored when BatchScan is enabled.
-       * <p>
-       * This configuration is incompatible with:
-       * <ul>
-       * <li>{@link #offlineScan()}</li>
-       * <li>{@link #localIterators()}</li>
-       * <li>{@link #scanIsolation()}</li>
-       * </ul>
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      BatchScanOptions batchScan();
-
-      /**
-       * @return newly created {@link InputInfo}
-       */
-      InputInfo build();
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java
new file mode 100644
index 0000000..e12d803
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ClientInfo;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloOutputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface OutputFormatBuilder {
+
+  /**
+   * Required params for client
+   *
+   * @since 2.0
+   */
+  interface ClientParams<T> {
+    /**
+     * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
+     * param can be created using {@link ClientInfo#from(Properties)}
+     *
+     * @param clientInfo
+     *          Accumulo connection information
+     */
+    OutputOptions<T> clientInfo(ClientInfo clientInfo);
+  }
+
+  /**
+   * Builder options
+   *
+   * @since 2.0
+   */
+  interface OutputOptions<T> {
+    /**
+     * Sets the default table name to use if one emits a null in place of a table name for a given
+     * mutation. Table names can only be alpha-numeric and underscores.
+     *
+     * @param tableName
+     *          the table to use when the tablename is null in the write call
+     */
+    OutputOptions<T> defaultTable(String tableName);
+
+    /**
+     * Enables the directive to create new tables, as necessary. Table names can only be
+     * alpha-numeric and underscores.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    OutputOptions<T> createTables();
+
+    /**
+     * Enables the directive to use simulation mode for this job. In simulation mode, no output is
+     * produced. This is useful for testing.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    OutputOptions<T> simulationMode();
+
+    /**
+     * Finish configuring, verify and serialize options into the Job or JobConf
+     */
+    void store(T j);
+  }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java
deleted file mode 100644
index 0ca7443..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.nio.file.Path;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.hadoopImpl.mapreduce.OutputInfoImpl;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to
- * {@link AccumuloOutputFormat#setInfo(Job, OutputInfo)}. It uses a fluent API like so:
- *
- * <pre>
- * OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build();
- * </pre>
- *
- * @since 2.0
- */
-public interface OutputInfo {
-  /**
-   * @return the client info set using OutputInfo.builder().clientInfo(info)
-   */
-  ClientInfo getClientInfo();
-
-  /**
-   * @return the BatchWriterConfig set using OutputInfo.builder()...batchWriterOptions(conf)
-   */
-  Optional<BatchWriterConfig> getBatchWriterOptions();
-
-  /**
-   * @return the default tame name set using OutputInfo.builder()...defaultTableName(name)
-   */
-  Optional<String> getDefaultTableName();
-
-  /**
-   * @return boolean if creating tables or not
-   */
-  boolean isCreateTables();
-
-  /**
-   * @return boolean if running simulation mode or not
-   */
-  boolean isSimulationMode();
-
-  /**
-   * @return builder for creating a {@link OutputInfo}
-   */
-  public static OutputInfoBuilder.ClientParams builder() {
-    return new OutputInfoImpl.OutputInfoBuilderImpl();
-  }
-
-  /**
-   * Fluent API builder for OutputInfo
-   *
-   * @since 2.0
-   */
-  interface OutputInfoBuilder {
-
-    /**
-     * Required params for client
-     *
-     * @since 2.0
-     */
-    interface ClientParams {
-      /**
-       * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
-       * param can be created using {@link ClientInfo#from(Path)} or
-       * {@link ClientInfo#from(Properties)}
-       *
-       * @param clientInfo
-       *          Accumulo connection information
-       */
-      OutputOptions clientInfo(ClientInfo clientInfo);
-    }
-
-    /**
-     * Builder options
-     *
-     * @since 2.0
-     */
-    interface OutputOptions {
-      /**
-       * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
-       * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the
-       * configuration multiple times overwrites any previous configuration.
-       *
-       * @param bwConfig
-       *          the configuration for the {@link BatchWriter}
-       */
-      OutputOptions batchWriterOptions(BatchWriterConfig bwConfig);
-
-      /**
-       * Sets the default table name to use if one emits a null in place of a table name for a given
-       * mutation. Table names can only be alpha-numeric and underscores.
-       *
-       * @param tableName
-       *          the table to use when the tablename is null in the write call
-       */
-      OutputOptions defaultTableName(String tableName);
-
-      /**
-       * Enables the directive to create new tables, as necessary. Table names can only be
-       * alpha-numeric and underscores.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      OutputOptions enableCreateTables();
-
-      /**
-       * Enables the directive to use simulation mode for this job. In simulation mode, no output is
-       * produced. This is useful for testing.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      OutputOptions enableSimulationMode();
-
-      /**
-       * @return newly created {@link OutputInfo}
-       */
-      OutputInfo build();
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
index 0f52588..49890ff 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
@@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -132,34 +131,6 @@ public class AccumuloOutputFormatImpl {
   }
 
   /**
-   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
-   * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
-   * multiple times overwrites any previous configuration.
-   *
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param bwConfig
-   *          the configuration for the {@link BatchWriter}
-   * @since 1.5.0
-   */
-  public static void setBatchWriterOptions(JobConf job, BatchWriterConfig bwConfig) {
-    OutputConfigurator.setBatchWriterOptions(CLASS, job, bwConfig);
-  }
-
-  /**
-   * Gets the {@link BatchWriterConfig} settings.
-   *
-   * @param job
-   *          the Hadoop context for the configured job
-   * @return the configuration object
-   * @since 1.5.0
-   * @see #setBatchWriterOptions(JobConf, BatchWriterConfig)
-   */
-  public static BatchWriterConfig getBatchWriterOptions(JobConf job) {
-    return OutputConfigurator.getBatchWriterOptions(CLASS, job);
-  }
-
-  /**
    * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric
    * and underscores.
    *
@@ -249,7 +220,7 @@ public class AccumuloOutputFormatImpl {
 
       if (!simulate) {
         this.client = Accumulo.newClient().from(getClientInfo(job)).build();
-        mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(job));
+        mtbw = client.createMultiTableBatchWriter();
       }
     }
 
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
index c51efa6..696f7f3 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
@@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -134,34 +133,6 @@ public class AccumuloOutputFormatImpl {
   }
 
   /**
-   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
-   * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
-   * multiple times overwrites any previous configuration.
-   *
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param bwConfig
-   *          the configuration for the {@link BatchWriter}
-   * @since 1.5.0
-   */
-  public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) {
-    OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig);
-  }
-
-  /**
-   * Gets the {@link BatchWriterConfig} settings.
-   *
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the configuration object
-   * @since 1.5.0
-   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
-   */
-  public static BatchWriterConfig getBatchWriterOptions(JobContext context) {
-    return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration());
-  }
-
-  /**
    * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric
    * and underscores.
    *
@@ -252,7 +223,7 @@ public class AccumuloOutputFormatImpl {
 
       if (!simulate) {
         this.client = Accumulo.newClient().from(getClientInfo(context)).build();
-        mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(context));
+        mtbw = client.createMultiTableBatchWriter();
       }
     }
 
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java
new file mode 100644
index 0000000..5e06e1f
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setCompressionType;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setDataBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setFileBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setIndexBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setReplication;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSampler;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSummarizers;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.FileOutputFormatBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+public class FileOutputFormatBuilderImpl<T> implements FileOutputFormatBuilder,
+    FileOutputFormatBuilder.PathParams<T>, FileOutputFormatBuilder.OutputOptions<T> {
+
+  Path outputPath;
+  Optional<String> comp = Optional.empty();
+  Optional<Long> dataBlockSize = Optional.empty();
+  Optional<Long> fileBlockSize = Optional.empty();
+  Optional<Long> indexBlockSize = Optional.empty();
+  Optional<Integer> replication = Optional.empty();
+  Optional<SamplerConfiguration> sampler = Optional.empty();
+  Collection<SummarizerConfiguration> summarizers = Collections.emptySet();
+
+  @Override
+  public OutputOptions<T> outputPath(Path path) {
+    this.outputPath = Objects.requireNonNull(path);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> compression(String compressionType) {
+    this.comp = Optional.of(compressionType);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> dataBlockSize(long dataBlockSize) {
+    this.dataBlockSize = Optional.of(dataBlockSize);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> fileBlockSize(long fileBlockSize) {
+    this.fileBlockSize = Optional.of(fileBlockSize);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> indexBlockSize(long indexBlockSize) {
+    this.indexBlockSize = Optional.of(indexBlockSize);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> replication(int replication) {
+    this.replication = Optional.of(replication);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> sampler(SamplerConfiguration samplerConfig) {
+    this.sampler = Optional.of(samplerConfig);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> summarizers(SummarizerConfiguration... summarizerConfigs) {
+    this.summarizers = Arrays.asList(Objects.requireNonNull(summarizerConfigs));
+    return this;
+  }
+
+  @Override
+  public void store(T j) {
+    if (j instanceof Job) {
+      store((Job) j);
+    } else if (j instanceof JobConf) {
+      store((JobConf) j);
+    } else {
+      throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+    }
+  }
+
+  private void store(Job job) {
+    setOutputPath(job, outputPath);
+    if (comp.isPresent())
+      setCompressionType(job, comp.get());
+    if (dataBlockSize.isPresent())
+      setDataBlockSize(job, dataBlockSize.get());
+    if (fileBlockSize.isPresent())
+      setFileBlockSize(job, fileBlockSize.get());
+    if (indexBlockSize.isPresent())
+      setIndexBlockSize(job, indexBlockSize.get());
+    if (replication.isPresent())
+      setReplication(job, replication.get());
+    if (sampler.isPresent())
+      setSampler(job, sampler.get());
+    if (summarizers.size() > 0)
+      setSummarizers(job, summarizers.toArray(new SummarizerConfiguration[0]));
+  }
+
+  private void store(JobConf job) {
+    org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(job, outputPath);
+    if (comp.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setCompressionType(job,
+          comp.get());
+    if (dataBlockSize.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setDataBlockSize(job,
+          dataBlockSize.get());
+    if (fileBlockSize.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setFileBlockSize(job,
+          fileBlockSize.get());
+    if (indexBlockSize.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setIndexBlockSize(job,
+          indexBlockSize.get());
+    if (replication.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setReplication(job,
+          replication.get());
+    if (sampler.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSampler(job,
+          sampler.get());
+    if (summarizers.size() > 0)
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSummarizers(job,
+          summarizers.toArray(new SummarizerConfiguration[0]));
+  }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java
deleted file mode 100644
index 64d300e..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
-import org.apache.hadoop.fs.Path;
-
-public class FileOutputInfoImpl implements FileOutputInfo {
-  Path outputPath;
-  Optional<String> comp;
-  Optional<Long> dataBlockSize;
-  Optional<Long> fileBlockSize;
-  Optional<Long> indexBlockSize;
-  Optional<Integer> replication;
-  Optional<SamplerConfiguration> sampler;
-  Collection<SummarizerConfiguration> summarizers;
-
-  public FileOutputInfoImpl(Path outputPath, Optional<String> comp, Optional<Long> dataBlockSize,
-      Optional<Long> fileBlockSize, Optional<Long> indexBlockSize, Optional<Integer> replication,
-      Optional<SamplerConfiguration> sampler, Collection<SummarizerConfiguration> summarizers) {
-    this.outputPath = outputPath;
-    this.comp = comp;
-    this.dataBlockSize = dataBlockSize;
-    this.fileBlockSize = fileBlockSize;
-    this.indexBlockSize = indexBlockSize;
-    this.replication = replication;
-    this.sampler = sampler;
-    this.summarizers = summarizers;
-  }
-
-  @Override
-  public Path getOutputPath() {
-    return outputPath;
-  }
-
-  @Override
-  public Optional<String> getCompressionType() {
-    return comp;
-  }
-
-  @Override
-  public Optional<Long> getDataBlockSize() {
-    return dataBlockSize;
-  }
-
-  @Override
-  public Optional<Long> getFileBlockSize() {
-    return fileBlockSize;
-  }
-
-  @Override
-  public Optional<Long> getIndexBlockSize() {
-    return indexBlockSize;
-  }
-
-  @Override
-  public Optional<Integer> getReplication() {
-    return replication;
-  }
-
-  @Override
-  public Optional<SamplerConfiguration> getSampler() {
-    return sampler;
-  }
-
-  @Override
-  public Collection<SummarizerConfiguration> getSummarizers() {
-    return summarizers;
-  }
-
-  public static class FileOutputInfoBuilderImpl implements FileOutputInfoBuilder,
-      FileOutputInfoBuilder.PathParams, FileOutputInfoBuilder.OutputOptions {
-    Path outputPath;
-    Optional<String> comp = Optional.empty();
-    Optional<Long> dataBlockSize = Optional.empty();
-    Optional<Long> fileBlockSize = Optional.empty();
-    Optional<Long> indexBlockSize = Optional.empty();
-    Optional<Integer> replication = Optional.empty();
-    Optional<SamplerConfiguration> sampler = Optional.empty();
-    Collection<SummarizerConfiguration> summarizers = Collections.emptySet();
-
-    @Override
-    public OutputOptions outputPath(Path path) {
-      this.outputPath = Objects.requireNonNull(path);
-      ;
-      return this;
-    }
-
-    @Override
-    public OutputOptions compressionType(String compressionType) {
-      this.comp = Optional.of(compressionType);
-      return this;
-    }
-
-    @Override
-    public OutputOptions dataBlockSize(long dataBlockSize) {
-      this.dataBlockSize = Optional.of(dataBlockSize);
-      return this;
-    }
-
-    @Override
-    public OutputOptions fileBlockSize(long fileBlockSize) {
-      this.fileBlockSize = Optional.of(fileBlockSize);
-      return this;
-    }
-
-    @Override
-    public OutputOptions indexBlockSize(long indexBlockSize) {
-      this.indexBlockSize = Optional.of(indexBlockSize);
-      return this;
-    }
-
-    @Override
-    public OutputOptions replication(int replication) {
-      this.replication = Optional.of(replication);
-      return this;
-    }
-
-    @Override
-    public OutputOptions sampler(SamplerConfiguration samplerConfig) {
-      this.sampler = Optional.of(samplerConfig);
-      return this;
-    }
-
-    @Override
-    public OutputOptions summarizers(SummarizerConfiguration... summarizerConfigs) {
-      this.summarizers = Arrays.asList(Objects.requireNonNull(summarizerConfigs));
-      return this;
-    }
-
-    @Override
-    public FileOutputInfo build() {
-      return new FileOutputInfoImpl(outputPath, comp, dataBlockSize, fileBlockSize, indexBlockSize,
-          replication, sampler, summarizers);
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
new file mode 100644
index 0000000..79deca6
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class InputFormatBuilderImpl<T> implements InputFormatBuilder,
+    InputFormatBuilder.ClientParams<T>, InputFormatBuilder.TableParams<T>,
+    InputFormatBuilder.AuthsParams<T>, InputFormatBuilder.InputFormatOptions<T>,
+    InputFormatBuilder.ScanOptions<T>, InputFormatBuilder.BatchScanOptions<T> {
+
+  Class<?> callingClass;
+  String tableName;
+  ClientInfo clientInfo;
+  Authorizations scanAuths;
+
+  Optional<String> context = Optional.empty();
+  Collection<Range> ranges = Collections.emptyList();
+  Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList();
+  Map<String,IteratorSetting> iterators = Collections.emptyMap();
+  Optional<SamplerConfiguration> samplerConfig = Optional.empty();
+  Map<String,String> hints = Collections.emptyMap();
+  BuilderBooleans bools = new BuilderBooleans();
+
+  public InputFormatBuilderImpl(Class<?> callingClass) {
+    this.callingClass = callingClass;
+  }
+
+  @Override
+  public InputFormatBuilder.TableParams<T> clientInfo(ClientInfo clientInfo) {
+    this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.AuthsParams<T> table(String tableName) {
+    this.tableName = Objects.requireNonNull(tableName, "Table name must not be null");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> auths(Authorizations auths) {
+    this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> classLoaderContext(String context) {
+    this.context = Optional.of(context);
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> ranges(Collection<Range> ranges) {
+    this.ranges = ImmutableList
+        .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null"));
+    if (this.ranges.size() == 0)
+      throw new IllegalArgumentException("Specified collection of ranges is empty.");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> fetchColumns(
+      Collection<IteratorSetting.Column> fetchColumns) {
+    this.fetchColumns = ImmutableList
+        .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null"));
+    if (this.fetchColumns.size() == 0)
+      throw new IllegalArgumentException("Specified collection of fetch columns is empty.");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> addIterator(IteratorSetting cfg) {
+    // store iterators by name to prevent duplicates
+    Objects.requireNonNull(cfg, "IteratorSetting must not be null.");
+    if (this.iterators.size() == 0)
+      this.iterators = new LinkedHashMap<>();
+    this.iterators.put(cfg.getName(), cfg);
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> executionHints(Map<String,String> hints) {
+    this.hints = ImmutableMap
+        .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null."));
+    if (hints.size() == 0)
+      throw new IllegalArgumentException("Specified map of execution hints is empty.");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> samplerConfiguration(
+      SamplerConfiguration samplerConfig) {
+    this.samplerConfig = Optional.of(samplerConfig);
+    return this;
+  }
+
+  @Override
+  public InputFormatOptions<T> disableAutoAdjustRanges() {
+    bools.autoAdjustRanges = false;
+    return this;
+  }
+
+  @Override
+  public ScanOptions<T> scanIsolation() {
+    bools.scanIsolation = true;
+    return this;
+  }
+
+  @Override
+  public ScanOptions<T> localIterators() {
+    bools.localIters = true;
+    return this;
+  }
+
+  @Override
+  public ScanOptions<T> offlineScan() {
+    bools.offlineScan = true;
+    return this;
+  }
+
+  @Override
+  public BatchScanOptions<T> batchScan() {
+    bools.batchScan = true;
+    bools.autoAdjustRanges = true;
+    return this;
+  }
+
+  @Override
+  public void store(T j) {
+    if (j instanceof Job) {
+      store((Job) j);
+    } else if (j instanceof JobConf) {
+      store((JobConf) j);
+    } else {
+      throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+    }
+  }
+
+  /**
+   * Final builder method for mapreduce configuration
+   */
+  private void store(Job job) {
+    // TODO validate params are set correctly, possibly call/modify
+    // AbstractInputFormat.validateOptions()
+    AbstractInputFormat.setClientInfo(job, clientInfo);
+    AbstractInputFormat.setScanAuthorizations(job, scanAuths);
+    InputFormatBase.setInputTableName(job, tableName);
+
+    // all optional values
+    if (context.isPresent())
+      AbstractInputFormat.setClassLoaderContext(job, context.get());
+    if (ranges.size() > 0)
+      InputFormatBase.setRanges(job, ranges);
+    if (iterators.size() > 0)
+      InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(),
+          iterators.values());
+    if (fetchColumns.size() > 0)
+      InputConfigurator.fetchColumns(callingClass, job.getConfiguration(), fetchColumns);
+    if (samplerConfig.isPresent())
+      InputFormatBase.setSamplerConfiguration(job, samplerConfig.get());
+    if (hints.size() > 0)
+      InputFormatBase.setExecutionHints(job, hints);
+    InputFormatBase.setAutoAdjustRanges(job, bools.autoAdjustRanges);
+    InputFormatBase.setScanIsolation(job, bools.scanIsolation);
+    InputFormatBase.setLocalIterators(job, bools.localIters);
+    InputFormatBase.setOfflineTableScan(job, bools.offlineScan);
+    InputFormatBase.setBatchScan(job, bools.batchScan);
+  }
+
+  /**
+   * Final builder method for legacy mapred configuration
+   */
+  private void store(JobConf jobConf) {
+    // TODO validate params are set correctly, possibly call/modify
+    // AbstractInputFormat.validateOptions()
+    org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo(jobConf, clientInfo);
+    org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf,
+        scanAuths);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf, tableName);
+
+    // all optional values
+    if (context.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf,
+          context.get());
+    if (ranges.size() > 0)
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf, ranges);
+    if (iterators.size() > 0)
+      InputConfigurator.writeIteratorsToConf(callingClass, jobConf, iterators.values());
+    if (fetchColumns.size() > 0)
+      InputConfigurator.fetchColumns(callingClass, jobConf, fetchColumns);
+    if (samplerConfig.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf,
+          samplerConfig.get());
+    if (hints.size() > 0)
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf, hints);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf,
+        bools.autoAdjustRanges);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf,
+        bools.scanIsolation);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf,
+        bools.localIters);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf,
+        bools.offlineScan);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf, bools.batchScan);
+  }
+
+  private static class BuilderBooleans {
+    boolean autoAdjustRanges = true;
+    boolean scanIsolation = false;
+    boolean offlineScan = false;
+    boolean localIters = false;
+    boolean batchScan = false;
+  }
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java
deleted file mode 100644
index 878148a..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class InputInfoImpl implements InputInfo {
-  String tableName;
-  ClientInfo clientInfo;
-  Authorizations scanAuths;
-
-  // optional values
-  Optional<String> context;
-  Collection<Range> ranges;
-  Collection<IteratorSetting.Column> fetchColumns;
-  Map<String,IteratorSetting> iterators;
-  Optional<SamplerConfiguration> samplerConfig;
-  Map<String,String> hints;
-  InputInfoBooleans bools;
-
-  public InputInfoImpl(String tableName, ClientInfo clientInfo, Authorizations scanAuths,
-      Optional<String> context, Collection<Range> ranges,
-      Collection<IteratorSetting.Column> fetchColumns, Map<String,IteratorSetting> iterators,
-      Optional<SamplerConfiguration> samplerConfig, Map<String,String> hints,
-      InputInfoBooleans bools) {
-    this.tableName = tableName;
-    this.clientInfo = clientInfo;
-    this.scanAuths = scanAuths;
-    this.context = context;
-    this.ranges = ranges;
-    this.fetchColumns = fetchColumns;
-    this.iterators = iterators;
-    this.samplerConfig = samplerConfig;
-    this.hints = hints;
-    this.bools = bools;
-  }
-
-  @Override
-  public String getTableName() {
-    return tableName;
-  }
-
-  @Override
-  public ClientInfo getClientInfo() {
-    return clientInfo;
-  }
-
-  public Authorizations getScanAuths() {
-    return scanAuths;
-  }
-
-  @Override
-  public Optional<String> getContext() {
-    return context;
-  }
-
-  @Override
-  public Collection<Range> getRanges() {
-    return ranges;
-  }
-
-  @Override
-  public Collection<IteratorSetting.Column> getFetchColumns() {
-    return fetchColumns;
-  }
-
-  @Override
-  public Collection<IteratorSetting> getIterators() {
-    return iterators.values();
-  }
-
-  @Override
-  public Optional<SamplerConfiguration> getSamplerConfig() {
-    return samplerConfig;
-  }
-
-  @Override
-  public Map<String,String> getExecutionHints() {
-    return hints;
-  }
-
-  @Override
-  public boolean isAutoAdjustRanges() {
-    return bools.autoAdjustRanges;
-  }
-
-  @Override
-  public boolean isScanIsolation() {
-    return bools.scanIsolation;
-  }
-
-  @Override
-  public boolean isLocalIterators() {
-    return bools.localIters;
-  }
-
-  @Override
-  public boolean isOfflineScan() {
-    return bools.offlineScan;
-  }
-
-  @Override
-  public boolean isBatchScan() {
-    return bools.batchScan;
-  }
-
-  private static class InputInfoBooleans {
-    boolean autoAdjustRanges = true;
-    boolean scanIsolation = false;
-    boolean offlineScan = false;
-    boolean localIters = false;
-    boolean batchScan = false;
-  }
-
-  public static class InputInfoBuilderImpl
-      implements InputInfoBuilder, InputInfoBuilder.ClientParams, InputInfoBuilder.TableParams,
-      InputInfoBuilder.AuthsParams, InputInfoBuilder.InputFormatOptions,
-      InputInfoBuilder.ScanOptions, InputInfoBuilder.BatchScanOptions {
-
-    String tableName;
-    ClientInfo clientInfo;
-    Authorizations scanAuths;
-
-    Optional<String> context = Optional.empty();
-    Collection<Range> ranges = Collections.emptyList();
-    Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList();
-    Map<String,IteratorSetting> iterators = Collections.emptyMap();
-    Optional<SamplerConfiguration> samplerConfig = Optional.empty();
-    Map<String,String> hints = Collections.emptyMap();
-    InputInfoBooleans bools = new InputInfoBooleans();
-
-    @Override
-    public InputInfoBuilder.TableParams clientInfo(ClientInfo clientInfo) {
-      this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.AuthsParams table(String tableName) {
-      this.tableName = Objects.requireNonNull(tableName, "Table name must not be null");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions scanAuths(Authorizations auths) {
-      this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions classLoaderContext(String context) {
-      this.context = Optional.of(context);
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions ranges(Collection<Range> ranges) {
-      this.ranges = ImmutableList
-          .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null"));
-      if (this.ranges.size() == 0)
-        throw new IllegalArgumentException("Specified collection of ranges is empty.");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions fetchColumns(
-        Collection<IteratorSetting.Column> fetchColumns) {
-      this.fetchColumns = ImmutableList
-          .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null"));
-      if (this.fetchColumns.size() == 0)
-        throw new IllegalArgumentException("Specified collection of fetch columns is empty.");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions addIterator(IteratorSetting cfg) {
-      // store iterators by name to prevent duplicates
-      Objects.requireNonNull(cfg, "IteratorSetting must not be null.");
-      if (this.iterators.size() == 0)
-        this.iterators = new LinkedHashMap<>();
-      this.iterators.put(cfg.getName(), cfg);
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions executionHints(Map<String,String> hints) {
-      this.hints = ImmutableMap
-          .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null."));
-      if (hints.size() == 0)
-        throw new IllegalArgumentException("Specified map of execution hints is empty.");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions samplerConfiguration(
-        SamplerConfiguration samplerConfig) {
-      this.samplerConfig = Optional.of(samplerConfig);
-      return this;
-    }
-
-    @Override
-    public InputFormatOptions disableAutoAdjustRanges() {
-      bools.autoAdjustRanges = false;
-      return this;
-    }
-
-    @Override
-    public ScanOptions scanIsolation() {
-      bools.scanIsolation = true;
-      return this;
-    }
-
-    @Override
-    public ScanOptions localIterators() {
-      bools.localIters = true;
-      return this;
-    }
-
-    @Override
-    public ScanOptions offlineScan() {
-      bools.offlineScan = true;
-      return this;
-    }
-
-    @Override
-    public BatchScanOptions batchScan() {
-      bools.batchScan = true;
-      bools.autoAdjustRanges = true;
-      return this;
-    }
-
-    @Override
-    public InputInfo build() {
-      return new InputInfoImpl(tableName, clientInfo, scanAuths, context, ranges, fetchColumns,
-          iterators, samplerConfig, hints, bools);
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
new file mode 100644
index 0000000..0e9a1ca
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setClientInfo;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setCreateTables;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setDefaultTableName;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setSimulationMode;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.hadoop.mapreduce.OutputFormatBuilder;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+public class OutputFormatBuilderImpl<T>
+    implements OutputFormatBuilder.ClientParams<T>, OutputFormatBuilder.OutputOptions<T> {
+  ClientInfo clientInfo;
+
+  // optional values
+  Optional<String> defaultTableName = Optional.empty();
+  boolean createTables = false;
+  boolean simulationMode = false;
+
+  @Override
+  public OutputFormatBuilder.OutputOptions<T> clientInfo(ClientInfo clientInfo) {
+    this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
+    return this;
+  }
+
+  @Override
+  public OutputFormatBuilder.OutputOptions<T> defaultTable(String tableName) {
+    this.defaultTableName = Optional.of(tableName);
+    return this;
+  }
+
+  @Override
+  public OutputFormatBuilder.OutputOptions<T> createTables() {
+    this.createTables = true;
+    return this;
+  }
+
+  @Override
+  public OutputFormatBuilder.OutputOptions<T> simulationMode() {
+    this.simulationMode = true;
+    return this;
+  }
+
+  @Override
+  public void store(T j) {
+    if (j instanceof Job) {
+      store((Job) j);
+    } else if (j instanceof JobConf) {
+      store((JobConf) j);
+    } else {
+      throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+    }
+  }
+
+  private void store(Job job) {
+    setClientInfo(job, clientInfo);
+    if (defaultTableName.isPresent())
+      setDefaultTableName(job, defaultTableName.get());
+    setCreateTables(job, createTables);
+    setSimulationMode(job, simulationMode);
+  }
+
+  private void store(JobConf jobConf) {
+    org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setClientInfo(jobConf,
+        clientInfo);
+    if (defaultTableName.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setDefaultTableName(jobConf,
+          defaultTableName.get());
+    org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setCreateTables(jobConf,
+        createTables);
+    org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setSimulationMode(jobConf,
+        simulationMode);
+
+  }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java
deleted file mode 100644
index 27c94d1..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
-
-public class OutputInfoImpl implements OutputInfo {
-  ClientInfo clientInfo;
-
-  // optional values
-  Optional<String> defaultTableName;
-  Optional<BatchWriterConfig> bwConfig;
-  boolean createTables;
-  boolean simulationMode;
-
-  public OutputInfoImpl(ClientInfo ci, Optional<String> defaultTableName,
-      Optional<BatchWriterConfig> bwConfig, boolean createTables, boolean simulationMode) {
-    this.clientInfo = ci;
-    this.defaultTableName = defaultTableName;
-    this.bwConfig = bwConfig;
-    this.createTables = createTables;
-    this.simulationMode = simulationMode;
-  }
-
-  @Override
-  public ClientInfo getClientInfo() {
-    return clientInfo;
-  }
-
-  @Override
-  public Optional<BatchWriterConfig> getBatchWriterOptions() {
-    return bwConfig;
-  }
-
-  @Override
-  public Optional<String> getDefaultTableName() {
-    return defaultTableName;
-  }
-
-  @Override
-  public boolean isCreateTables() {
-    return createTables;
-  }
-
-  @Override
-  public boolean isSimulationMode() {
-    return simulationMode;
-  }
-
-  public static class OutputInfoBuilderImpl implements OutputInfoBuilder,
-      OutputInfoBuilder.ClientParams, OutputInfoBuilder.OutputOptions {
-    ClientInfo clientInfo;
-
-    // optional values
-    Optional<String> defaultTableName = Optional.empty();
-    Optional<BatchWriterConfig> bwConfig = Optional.empty();
-    boolean createTables = false;
-    boolean simulationMode = false;
-
-    @Override
-    public OutputOptions clientInfo(ClientInfo clientInfo) {
-      this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
-      return this;
-    }
-
-    @Override
-    public OutputOptions batchWriterOptions(BatchWriterConfig bwConfig) {
-      this.bwConfig = Optional.of(bwConfig);
-      return this;
-    }
-
-    @Override
-    public OutputOptions defaultTableName(String tableName) {
-      this.defaultTableName = Optional.of(tableName);
-      return this;
-    }
-
-    @Override
-    public OutputOptions enableCreateTables() {
-      this.createTables = true;
-      return this;
-    }
-
-    @Override
-    public OutputOptions enableSimulationMode() {
-      this.simulationMode = true;
-      return this;
-    }
-
-    @Override
-    public OutputInfo build() {
-      return new OutputInfoImpl(clientInfo, defaultTableName, bwConfig, createTables,
-          simulationMode);
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
index 772edc1..6f4ff28 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
@@ -19,8 +19,6 @@ package org.apache.accumulo.hadoopImpl.mapreduce.lib;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
 import org.apache.hadoop.mapreduce.Job;
 
 import com.beust.jcommander.Parameter;
@@ -41,10 +39,9 @@ public class MapReduceClientOnDefaultTable extends MapReduceClientOpts {
   public void setAccumuloConfigs(Job job) {
     final String tableName = getTableName();
     final ClientInfo info = getClientInfo();
-    AccumuloInputFormat.setInfo(job,
-        InputInfo.builder().clientInfo(info).table(tableName).scanAuths(auths).build());
-    AccumuloOutputFormat.setInfo(job, OutputInfo.builder().clientInfo(info)
-        .defaultTableName(tableName).enableCreateTables().build());
+    AccumuloInputFormat.configure().clientInfo(info).table(tableName).auths(auths).store(job);
+    AccumuloOutputFormat.configure().clientInfo(info).defaultTable(tableName).createTables()
+        .store(job);
   }
 
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
index e6c91db..76efb96 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
@@ -19,8 +19,6 @@ package org.apache.accumulo.hadoopImpl.mapreduce.lib;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
 import org.apache.hadoop.mapreduce.Job;
 
 import com.beust.jcommander.Parameter;
@@ -34,10 +32,10 @@ public class MapReduceClientOnRequiredTable extends MapReduceClientOpts {
   public void setAccumuloConfigs(Job job) {
     final String tableName = getTableName();
     final ClientInfo info = getClientInfo();
-    AccumuloInputFormat.setInfo(job,
-        InputInfo.builder().clientInfo(info).table(tableName).scanAuths(auths).build());
-    AccumuloOutputFormat.setInfo(job, OutputInfo.builder().clientInfo(info)
-        .defaultTableName(tableName).enableCreateTables().build());
+    System.out.println("MIKE here is dahhhhhhhhhhhhhhhhhhh PRINCIPAL= " + info.getPrincipal());
+    AccumuloInputFormat.configure().clientInfo(info).table(tableName).auths(auths).store(job);
+    AccumuloOutputFormat.configure().clientInfo(info).defaultTable(tableName).createTables()
+        .store(job);
   }
 
   public String getTableName() {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
index 12ce572..9f325d4 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
@@ -16,16 +16,19 @@
  */
 package org.apache.accumulo.hadoopImpl.mapreduce.lib;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_DURABILITY;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.clientImpl.DurabilityImpl;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -85,61 +88,30 @@ public class OutputConfigurator extends ConfiguratorBase {
   }
 
   /**
-   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
-   * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
-   * multiple times overwrites any previous configuration.
-   *
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param bwConfig
-   *          the configuration for the {@link BatchWriter}
-   * @since 1.6.0
-   */
-  public static void setBatchWriterOptions(Class<?> implementingClass, Configuration conf,
-      BatchWriterConfig bwConfig) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    String serialized;
-    try {
-      bwConfig.write(new DataOutputStream(baos));
-      serialized = new String(baos.toByteArray(), UTF_8);
-      baos.close();
-    } catch (IOException e) {
-      throw new IllegalArgumentException(
-          "unable to serialize " + BatchWriterConfig.class.getName());
-    }
-    conf.set(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG), serialized);
-  }
-
-  /**
-   * Gets the {@link BatchWriterConfig} settings.
-   *
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return the configuration object
-   * @since 1.6.0
-   * @see #setBatchWriterOptions(Class, Configuration, BatchWriterConfig)
+   * Gets the {@link BatchWriterConfig} settings that were stored with ClientInfo
    */
   public static BatchWriterConfig getBatchWriterOptions(Class<?> implementingClass,
       Configuration conf) {
-    String serialized = conf.get(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG));
     BatchWriterConfig bwConfig = new BatchWriterConfig();
-    if (serialized == null || serialized.isEmpty()) {
-      return bwConfig;
-    } else {
-      try {
-        ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(UTF_8));
-        bwConfig.readFields(new DataInputStream(bais));
-        bais.close();
-        return bwConfig;
-      } catch (IOException e) {
-        throw new IllegalArgumentException(
-            "unable to serialize " + BatchWriterConfig.class.getName());
-      }
-    }
+    ClientInfo info = getClientInfo(implementingClass, conf);
+    Properties props = info.getProperties();
+    String property = props.getProperty(BATCH_WRITER_DURABILITY.getKey());
+    if (property != null)
+      bwConfig.setDurability(DurabilityImpl.fromString(property));
+    property = props.getProperty(BATCH_WRITER_MAX_LATENCY_SEC.getKey());
+    if (property != null)
+      bwConfig.setMaxLatency(Long.parseLong(property), TimeUnit.MILLISECONDS);
+    property = props.getProperty(BATCH_WRITER_MAX_MEMORY_BYTES.getKey());
+    if (property != null)
+      bwConfig.setMaxMemory(Long.parseLong(property));
+    property = props.getProperty(BATCH_WRITER_MAX_TIMEOUT_SEC.getKey());
+    if (property != null)
+      bwConfig.setTimeout(Long.parseLong(property), TimeUnit.MILLISECONDS);
+    property = props.getProperty(BATCH_WRITER_MAX_WRITE_THREADS.getKey());
+    if (property != null)
+      bwConfig.setMaxWriteThreads(Integer.parseInt(property));
+
+    return bwConfig;
   }
 
   /**
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000..e7bd8f7
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.ConfiguratorBase;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapred.AccumuloFileOutputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(AccumuloFileOutputFormatIT.class);
+  private static final int JOB_VISIBILITY_CACHE_SIZE = 3000;
+  private static final String PREFIX = AccumuloFileOutputFormatIT.class.getSimpleName();
+  private static final String BAD_TABLE = PREFIX + "_mapred_bad_table";
+  private static final String TEST_TABLE = PREFIX + "_mapred_test_table";
+  private static final String EMPTY_TABLE = PREFIX + "_mapred_empty_table";
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+      RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(
+      new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testEmptyWrite() throws Exception {
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(EMPTY_TABLE);
+      handleWriteTests(false);
+    }
+  }
+
+  @Test
+  public void testRealWrite() throws Exception {
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(TEST_TABLE);
+      BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+      Mutation m = new Mutation("Key");
+      m.put("", "", "");
+      bw.addMutation(m);
+      bw.close();
+      handleWriteTests(true);
+    }
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    private static class BadKeyMapper implements Mapper<Key,Value,Key,Value> {
+
+      int index = 0;
+
+      @Override
+      public void map(Key key, Value value, OutputCollector<Key,Value> output, Reporter reporter)
+          throws IOException {
+        try {
+          try {
+            output.collect(key, value);
+            if (index == 2)
+              fail();
+          } catch (Exception e) {
+            log.error(e.toString(), e);
+            assertEquals(2, index);
+          }
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        index++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(2, index);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+      }
+
+      String table = args[0];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+      ConfiguratorBase.setVisibilityCacheSize(job, JOB_VISIBILITY_CACHE_SIZE);
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+          .auths(Authorizations.EMPTY).store(job);
+      AccumuloFileOutputFormat.configure().outputPath(new Path(args[1])).sampler(SAMPLER_CONFIG)
+          .store(job);
+
+      job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : IdentityMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloFileOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  private void handleWriteTests(boolean content) throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    if (f.delete()) {
+      log.debug("Deleted {}", f);
+    }
+    MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+    assertTrue(f.exists());
+    File[] files = f.listFiles(file -> file.getName().startsWith("part-m-"));
+    assertNotNull(files);
+    if (content) {
+      assertEquals(1, files.length);
+      assertTrue(files[0].exists());
+
+      Configuration conf = CachedConfiguration.getInstance();
+      DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
+      FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder()
+          .forFile(files[0].toString(), FileSystem.getLocal(conf), conf)
+          .withTableConfiguration(acuconf).build()
+          .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+      assertNotNull(sample);
+    } else {
+      assertEquals(0, files.length);
+    }
+  }
+
+  @Test
+  public void writeBadVisibility() throws Exception {
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(BAD_TABLE);
+      BatchWriter bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+      Mutation m = new Mutation("r1");
+      m.put("cf1", "cq1", "A&B");
+      m.put("cf1", "cq1", "A&B");
+      m.put("cf1", "cq2", "A&");
+      bw.addMutation(m);
+      bw.close();
+      File f = folder.newFile(testName.getMethodName());
+      if (f.delete()) {
+        log.debug("Deleted {}", f);
+      }
+      MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+      assertNull(e1);
+      assertNull(e2);
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java
new file mode 100644
index 0000000..083865f
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapred.RangeInputSplit;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
+
+  @BeforeClass
+  public static void setupClass() {
+    System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+  }
+
+  private static AssertionError e1 = null;
+  private static int e1Count = 0;
+  private static AssertionError e2 = null;
+  private static int e2Count = 0;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter)
+          throws IOException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+          e1Count++;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+          e2Count++;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 1 && args.length != 3) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <table> [<batchScan> <scan sample>]");
+      }
+
+      String table = args[0];
+      Boolean batchScan = false;
+      boolean sample = false;
+      if (args.length == 3) {
+        batchScan = Boolean.parseBoolean(args[1]);
+        sample = Boolean.parseBoolean(args[2]);
+      }
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      InputFormatBuilder.InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure()
+          .clientInfo(getClientInfo()).table(table).auths(Authorizations.EMPTY);
+      if (batchScan)
+        opts.batchScan();
+      if (sample) {
+        opts.samplerConfiguration(SAMPLER_CONFIG);
+      }
+      opts.store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String... args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(table);
+      BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      e1 = null;
+      e2 = null;
+
+      MRTester.main(table);
+      assertNull(e1);
+      assertNull(e2);
+    }
+  }
+
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+      RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+  @Test
+  public void testSample() throws Exception {
+    final String TEST_TABLE_3 = getUniqueNames(1)[0];
+
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(TEST_TABLE_3,
+          new NewTableConfiguration().enableSampling(SAMPLER_CONFIG));
+      BatchWriter bw = c.createBatchWriter(TEST_TABLE_3, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      MRTester.main(TEST_TABLE_3, "False", "True");
+      assertEquals(38, e1Count);
+      assertEquals(1, e2Count);
+
+      e2Count = e1Count = 0;
+      MRTester.main(TEST_TABLE_3, "False", "False");
+      assertEquals(0, e1Count);
+      assertEquals(0, e2Count);
+
+      e2Count = e1Count = 0;
+      MRTester.main(TEST_TABLE_3, "True", "True");
+      assertEquals(38, e1Count);
+      assertEquals(1, e2Count);
+    }
+  }
+
+  @Test
+  public void testCorrectRangeInputSplits() throws Exception {
+    JobConf job = new JobConf();
+
+    String table = getUniqueNames(1)[0];
+    Authorizations auths = new Authorizations("foo");
+    Collection<IteratorSetting.Column> fetchColumns = Collections
+        .singleton(new IteratorSetting.Column(new Text("foo"), new Text("bar")));
+
+    try (AccumuloClient accumuloClient = getAccumuloClient()) {
+      accumuloClient.tableOperations().create(table);
+
+      AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table).auths(auths)
+          .fetchColumns(fetchColumns).scanIsolation().localIterators().store(job);
+
+      AccumuloInputFormat aif = new AccumuloInputFormat();
+
+      InputSplit[] splits = aif.getSplits(job, 1);
+
+      assertEquals(1, splits.length);
+
+      InputSplit split = splits[0];
+
+      assertEquals(RangeInputSplit.class, split.getClass());
+
+      RangeInputSplit risplit = (RangeInputSplit) split;
+
+      assertEquals(table, risplit.getTableName());
+      assertTrue(risplit.isIsolatedScan());
+      assertTrue(risplit.usesLocalIterators());
+      assertEquals(fetchColumns, risplit.getFetchedColumns());
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java
new file mode 100644
index 0000000..973ebc3
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloOutputFormatIT extends ConfigurableMacBase {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "1");
+    cfg.setNumTservers(1);
+  }
+
+  // Prevent regression of ACCUMULO-3709.
+  @Test
+  public void testMapred() throws Exception {
+    try (AccumuloClient accumuloClient = getClient()) {
+      // create a table and put some data in it
+      accumuloClient.tableOperations().create(testName.getMethodName());
+
+      JobConf job = new JobConf();
+      BatchWriterConfig batchConfig = new BatchWriterConfig();
+      // no flushes!!!!!
+      batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS);
+      // use a single thread to ensure our update session times out
+      batchConfig.setMaxWriteThreads(1);
+      // set the max memory so that we ensure we don't flush on the write.
+      batchConfig.setMaxMemory(Long.MAX_VALUE);
+      AccumuloOutputFormat outputFormat = new AccumuloOutputFormat();
+      AccumuloOutputFormat.configure().clientInfo(getClientInfo(batchConfig)).store(job);
+      RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null);
+
+      try {
+        for (int i = 0; i < 3; i++) {
+          Mutation m = new Mutation(new Text(String.format("%08d", i)));
+          for (int j = 0; j < 3; j++) {
+            m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
+          }
+          writer.write(new Text(testName.getMethodName()), m);
+        }
+
+      } catch (Exception e) {
+        e.printStackTrace();
+        // we don't want the exception to come from write
+      }
+
+      accumuloClient.securityOperations().revokeTablePermission("root", testName.getMethodName(),
+          TablePermission.WRITE);
+
+      try {
+        writer.close(null);
+        fail("Did not throw exception");
+      } catch (IOException ex) {
+        log.info(ex.getMessage(), ex);
+        assertTrue(ex.getCause() instanceof MutationsRejectedException);
+      }
+    }
+  }
+
+  private static AssertionError e1 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      OutputCollector<Text,Mutation> finalOutput;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) {
+        finalOutput = output;
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        finalOutput.collect(new Text(), m);
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 6) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName()
+            + " <user> <pass> <inputtable> <outputtable> <instanceName> <zooKeepers>");
+      }
+
+      String user = args[0];
+      String pass = args[1];
+      String table1 = args[2];
+      String table2 = args[3];
+      String instanceName = args[4];
+      String zooKeepers = args[5];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      ClientInfo info = Accumulo.newClient().to(instanceName, zooKeepers).as(user, pass).info();
+
+      AccumuloInputFormat.configure().clientInfo(info).table(table1).auths(Authorizations.EMPTY)
+          .store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.configure().clientInfo(info).defaultTable(table2).store(job);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMR() throws Exception {
+    try (AccumuloClient c = getClient()) {
+      String instanceName = getCluster().getInstanceName();
+      String table1 = instanceName + "_t1";
+      String table2 = instanceName + "_t2";
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      MRTester.main(new String[] {"root", ROOT_PASSWORD, table1, table2, instanceName,
+          getCluster().getZooKeepers()});
+      assertNull(e1);
+
+      try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        assertTrue(iter.hasNext());
+        Entry<Key,Value> entry = iter.next();
+        assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+        assertFalse(iter.hasNext());
+      }
+    }
+  }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
index 2147554..c385437 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.hadoop.mapred.AccumuloRowInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -166,8 +165,8 @@ public class AccumuloRowInputFormatIT extends AccumuloClusterHarness {
 
       job.setInputFormat(AccumuloRowInputFormat.class);
 
-      AccumuloRowInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo())
-          .table(table).scanAuths(Authorizations.EMPTY).build());
+      AccumuloRowInputFormat.configure().clientInfo(getClientInfo()).table(table)
+          .auths(Authorizations.EMPTY).store(job);
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java
new file mode 100644
index 0000000..fb4e4c3
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class TokenFileIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTokenFileTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      OutputCollector<Text,Mutation> finalOutput;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter)
+          throws IOException {
+        finalOutput = output;
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        finalOutput.collect(new Text(), m);
+      }
+
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName()
+            + " <token file> <inputtable> <outputtable>");
+      }
+
+      String tokenFile = args[0];
+      ClientInfo ci = ClientInfo.from(Paths.get(tokenFile));
+      String table1 = args[1];
+      String table2 = args[2];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(ci).table(table1).auths(Authorizations.EMPTY)
+          .store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.configure().clientInfo(ci).defaultTable(table2).store(job);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
+    public static void main(String[] args) throws Exception {
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(
+      new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      File tf = folder.newFile("client.properties");
+      try (PrintStream out = new PrintStream(tf)) {
+        getClientInfo().getProperties().store(out, "Credentials for " + getClass().getName());
+      }
+
+      MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+      assertNull(e1);
+
+      try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        assertTrue(iter.hasNext());
+        Entry<Key,Value> entry = iter.next();
+        assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+        assertFalse(iter.hasNext());
+      }
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000..7c482a4
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+
+  private String PREFIX;
+  private String BAD_TABLE;
+  private String TEST_TABLE;
+  private String EMPTY_TABLE;
+
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+      RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(
+      new File(System.getProperty("user.dir") + "/target"));
+
+  @Before
+  public void setup() throws Exception {
+    PREFIX = testName.getMethodName() + "_";
+    BAD_TABLE = PREFIX + "_mapreduce_bad_table";
+    TEST_TABLE = PREFIX + "_mapreduce_test_table";
+    EMPTY_TABLE = PREFIX + "_mapreduce_empty_table";
+
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(EMPTY_TABLE);
+      c.tableOperations().create(TEST_TABLE);
+      c.tableOperations().create(BAD_TABLE);
+      BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+      Mutation m = new Mutation("Key");
+      m.put("", "", "");
+      bw.addMutation(m);
+      bw.close();
+      bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+      m = new Mutation("r1");
+      m.put("cf1", "cq1", "A&B");
+      m.put("cf1", "cq1", "A&B");
+      m.put("cf1", "cq2", "A&");
+      bw.addMutation(m);
+      bw.close();
+    }
+  }
+
+  @Test
+  public void testEmptyWrite() throws Exception {
+    handleWriteTests(false);
+  }
+
+  @Test
+  public void testRealWrite() throws Exception {
+    handleWriteTests(true);
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    private static class BadKeyMapper extends Mapper<Key,Value,Key,Value> {
+      int index = 0;
+
+      @Override
+      protected void map(Key key, Value value, Context context)
+          throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+        try {
+          try {
+            context.write(key, value);
+            if (index == 2)
+              fail();
+          } catch (Exception e) {
+            assertEquals(2, index);
+          }
+        } catch (AssertionError e) {
+          assertionErrors.put(table + "_map", e);
+        }
+        index++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+        try {
+          assertEquals(2, index);
+        } catch (AssertionError e) {
+          assertionErrors.put(table + "_cleanup", e);
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+      }
+
+      String table = args[0];
+      assertionErrors.put(table + "_map", new AssertionError("Dummy_map"));
+      assertionErrors.put(table + "_cleanup", new AssertionError("Dummy_cleanup"));
+
+      Job job = Job.getInstance(getConf(),
+          this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+          .auths(Authorizations.EMPTY).store(job);
+      AccumuloFileOutputFormat.configure().outputPath(new Path(args[1])).sampler(SAMPLER_CONFIG)
+          .store(job);
+
+      job.setMapperClass(
+          table.endsWith("_mapreduce_bad_table") ? BadKeyMapper.class : Mapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+      job.getConfiguration().set("MRTester_tableName", table);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  private void handleWriteTests(boolean content) throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    assertTrue(f.delete());
+    MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+    assertTrue(f.exists());
+    File[] files = f.listFiles(file -> file.getName().startsWith("part-m-"));
+    assertNotNull(files);
+    if (content) {
+      assertEquals(1, files.length);
+      assertTrue(files[0].exists());
+
+      Configuration conf = CachedConfiguration.getInstance();
+      DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
+      FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder()
+          .forFile(files[0].toString(), FileSystem.getLocal(conf), conf)
+          .withTableConfiguration(acuconf).build()
+          .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+      assertNotNull(sample);
+    } else {
+      assertEquals(0, files.length);
+    }
+  }
+
+  // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to
+  // ensure test correctness),
+  // so error tests should check to see if there is at least one error (could be more depending on
+  // the test) rather than zero
+  private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
+
+  @Test
+  public void writeBadVisibility() throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    assertTrue(f.delete());
+    MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+    assertTrue(f.exists());
+    assertEquals(1, assertionErrors.get(BAD_TABLE + "_map").size());
+    assertEquals(1, assertionErrors.get(BAD_TABLE + "_cleanup").size());
+  }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
similarity index 87%
rename from hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java
rename to hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
index a775550..a806726 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
@@ -48,7 +48,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
 import org.apache.accumulo.hadoopImpl.mapreduce.BatchInputSplit;
 import org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -75,7 +75,7 @@ import com.google.common.collect.Multimap;
  *
  * @since 2.0
  */
-public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
 
   AccumuloInputFormat inputFormat;
 
@@ -106,8 +106,8 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
     insertData(table, currentTimeMillis());
 
     Job job = Job.getInstance();
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).scanIsolation().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).scanIsolation().store(job);
 
     // split table
     TreeSet<Text> splitsToAdd = new TreeSet<>();
@@ -126,14 +126,14 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
     List<Range> ranges = new ArrayList<>();
     for (Text text : actualSplits)
       ranges.add(new Range(text));
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).ranges(ranges).build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).ranges(ranges).store(job);
     splits = inputFormat.getSplits(job);
     assertEquals(actualSplits.size(), splits.size());
 
     // offline mode
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).offlineScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).offlineScan().store(job);
     try {
       inputFormat.getSplits(job);
       fail("An exception should have been thrown");
@@ -148,19 +148,19 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
     for (int i = 0; i < 5; i++)
       // overlapping ranges
       ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2)));
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).ranges(ranges).offlineScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).ranges(ranges).offlineScan().store(job);
     splits = inputFormat.getSplits(job);
     assertEquals(2, splits.size());
 
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).disableAutoAdjustRanges().offlineScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).disableAutoAdjustRanges().offlineScan().store(job);
     splits = inputFormat.getSplits(job);
     assertEquals(ranges.size(), splits.size());
 
     // BatchScan not available for offline scans
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).batchScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).batchScan().store(job);
     try {
       inputFormat.getSplits(job);
       fail("An exception should have been thrown");
@@ -168,28 +168,28 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
 
     // table online tests
     client.tableOperations().online(table, true);
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).store(job);
     // test for resumption of success
     splits = inputFormat.getSplits(job);
     assertEquals(2, splits.size());
 
     // BatchScan not available with isolated iterators
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).scanIsolation().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).scanIsolation().store(job);
 
     splits = inputFormat.getSplits(job);
     assertEquals(2, splits.size());
 
     // BatchScan not available with local iterators
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).localIterators().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).localIterators().store(job);
 
     splits = inputFormat.getSplits(job);
     assertEquals(2, splits.size());
 
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).batchScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).batchScan().store(job);
 
     // Check we are getting back correct type pf split
     splits = inputFormat.getSplits(job);
@@ -285,14 +285,14 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
 
       job.setInputFormatClass(inputFormatClass);
 
-      InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder()
-          .clientInfo(getClientInfo()).table(table).scanAuths(Authorizations.EMPTY);
+      InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(getClientInfo())
+          .table(table).auths(Authorizations.EMPTY);
       if (sample)
         opts = opts.samplerConfiguration(SAMPLER_CONFIG);
       if (batchScan)
-        AccumuloInputFormat.setInfo(job, opts.batchScan().build());
+        opts.batchScan().store(job);
       else
-        AccumuloInputFormat.setInfo(job, opts.build());
+        opts.store(job);
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -406,10 +406,9 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
     AccumuloClient accumuloClient = getAccumuloClient();
     accumuloClient.tableOperations().create(table);
 
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder()
-        .clientInfo(getClientInfo()).table(table).scanAuths(auths);
-    AccumuloInputFormat.setInfo(job,
-        opts.fetchColumns(fetchColumns).scanIsolation().localIterators().build());
+    InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(getClientInfo())
+        .table(table).auths(auths);
+    opts.fetchColumns(fetchColumns).scanIsolation().localIterators().store(job);
 
     AccumuloInputFormat aif = new AccumuloInputFormat();
 
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java
new file mode 100644
index 0000000..badebe9
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloOutputFormatIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        context.write(new Text(), m);
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <inputtable> <outputtable>");
+      }
+
+      String table1 = args[0];
+      String table2 = args[1];
+
+      Job job = Job.getInstance(getConf(),
+          this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table1)
+          .auths(Authorizations.EMPTY).store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.configure().clientInfo(getClientInfo()).defaultTable(table2).store(job);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      MRTester.main(new String[] {table1, table2});
+      assertNull(e1);
+
+      try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        assertTrue(iter.hasNext());
+        Entry<Key,Value> entry = iter.next();
+        assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+        assertFalse(iter.hasNext());
+      }
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
index a125a7c..6a836cf 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloRowInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -160,8 +159,8 @@ public class AccumuloRowInputFormatIT extends AccumuloClusterHarness {
 
       job.setInputFormatClass(AccumuloRowInputFormat.class);
 
-      AccumuloRowInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo())
-          .table(table).scanAuths(Authorizations.EMPTY).build());
+      AccumuloRowInputFormat.configure().clientInfo(getClientInfo()).table(table)
+          .auths(Authorizations.EMPTY).store(job);
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
new file mode 100644
index 0000000..75c3efb
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+import com.beust.jcommander.Parameter;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class RowHashIT extends ConfigurableMacBase {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  public static final String hadoopTmpDirArg = "-Dhadoop.tmp.dir=" + System.getProperty("user.dir")
+      + "/target/hadoop-tmp";
+
+  static final String tablename = "mapredf";
+  static final String input_cf = "cf-HASHTYPE";
+  static final String input_cq = "cq-NOTHASHED";
+  static final String input_cfcq = input_cf + ":" + input_cq;
+  static final String output_cq = "cq-MD4BASE64";
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient client = getClient()) {
+      runTest(client, getCluster());
+    }
+  }
+
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "md5 is okay for testing")
+  static void runTest(AccumuloClient c, MiniAccumuloClusterImpl cluster) throws AccumuloException,
+      AccumuloSecurityException, TableExistsException, TableNotFoundException,
+      MutationsRejectedException, IOException, InterruptedException, NoSuchAlgorithmException {
+    c.tableOperations().create(tablename);
+    BatchWriter bw = c.createBatchWriter(tablename, new BatchWriterConfig());
+    for (int i = 0; i < 10; i++) {
+      Mutation m = new Mutation("" + i);
+      m.put(input_cf, input_cq, "row" + i);
+      bw.addMutation(m);
+    }
+    bw.close();
+    Process hash = cluster.exec(RowHash.class, Collections.singletonList(hadoopTmpDirArg), "-i",
+        c.info().getInstanceName(), "-z", c.info().getZooKeepers(), "-u", "root", "-p",
+        ROOT_PASSWORD, "-t", tablename, "--column", input_cfcq);
+    assertEquals(0, hash.waitFor());
+
+    try (Scanner s = c.createScanner(tablename, Authorizations.EMPTY)) {
+      s.fetchColumn(new Text(input_cf), new Text(output_cq));
+      int i = 0;
+      for (Entry<Key,Value> entry : s) {
+        MessageDigest md = MessageDigest.getInstance("MD5");
+        byte[] check = Base64.getEncoder().encode(md.digest(("row" + i).getBytes()));
+        assertEquals(entry.getValue().toString(), new String(check));
+        i++;
+      }
+    }
+  }
+
+  public static class RowHash extends Configured implements Tool {
+    /**
+     * The Mapper class that given a row number, will generate the appropriate output line.
+     */
+    public class HashDataMapper extends Mapper<Key,Value,Text,Mutation> {
+      @Override
+      public void map(Key row, Value data, Context context)
+          throws IOException, InterruptedException {
+        Mutation m = new Mutation(row.getRow());
+        m.put(new Text("cf-HASHTYPE"), new Text("cq-MD5BASE64"),
+            new Value(Base64.getEncoder().encode(MD5Hash.digest(data.toString()).getDigest())));
+        context.write(null, m);
+        context.progress();
+      }
+
+      @Override
+      public void setup(Context job) {}
+    }
+
+    public class Opts extends MapReduceClientOnRequiredTable {
+      @Parameter(names = "--column", required = true)
+      String column;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      Job job = Job.getInstance(getConf());
+      job.setJobName(this.getClass().getName());
+      job.setJarByClass(this.getClass());
+      RowHash.Opts opts = new RowHash.Opts();
+      opts.parseArgs(RowHash.class.getName(), args);
+      job.setInputFormatClass(AccumuloInputFormat.class);
+      opts.setAccumuloConfigs(job);
+
+      String col = opts.column;
+      int idx = col.indexOf(":");
+      Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
+      Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
+      if (cf.getLength() > 0)
+        AccumuloInputFormat.configure().clientInfo(opts.getClientInfo()).table(opts.getTableName())
+            .auths(Authorizations.EMPTY)
+            .fetchColumns(Collections.singleton(new IteratorSetting.Column(cf, cq))).store(job);
+
+      job.setMapperClass(RowHash.HashDataMapper.class);
+      job.setMapOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Mutation.class);
+
+      job.setNumReduceTasks(0);
+
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      AccumuloOutputFormat.configure().clientInfo(opts.getClientInfo()).store(job);
+
+      job.waitForCompletion(true);
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      ToolRunner.run(new Configuration(), new RowHash(), args);
+    }
+  }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java
new file mode 100644
index 0000000..ae29912
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class TokenFileIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTokenFileTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        context.write(new Text(), m);
+      }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName()
+            + " <token file> <inputtable> <outputtable>");
+      }
+
+      String tokenFile = args[0];
+      ClientInfo ci = ClientInfo.from(Paths.get(tokenFile));
+      String table1 = args[1];
+      String table2 = args[2];
+
+      Job job = Job.getInstance(getConf(),
+          this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(ci).table(table1).auths(Authorizations.EMPTY)
+          .store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.configure().clientInfo(ci).defaultTable(table2).store(job);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
+    public static void main(String[] args) throws Exception {
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(
+      new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      File tf = folder.newFile("client.properties");
+      try (PrintStream out = new PrintStream(tf)) {
+        getClientInfo().getProperties().store(out, "Credentials for " + getClass().getName());
+      }
+
+      MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+      assertNull(e1);
+
+      try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        assertTrue(iter.hasNext());
+        Entry<Key,Value> entry = iter.next();
+        assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+        assertFalse(iter.hasNext());
+      }
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
index 7e1db66..c170918 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
@@ -31,7 +31,6 @@ import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
@@ -57,10 +56,9 @@ public class AccumuloFileOutputFormatTest {
         .addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
 
     JobConf job = new JobConf();
-    AccumuloFileOutputFormat.setInfo(job,
-        FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
-            .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig)
-            .summarizers(sc1, sc2).build());
+    AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+        .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+        .summarizers(sc1, sc2).store(job);
 
     AccumuloConfiguration acuconf = FileOutputConfigurator
         .getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
@@ -89,9 +87,9 @@ public class AccumuloFileOutputFormatTest {
     samplerConfig.addOption("modulus", "100003");
 
     job = new JobConf();
-    AccumuloFileOutputFormat.setInfo(job,
-        FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
-            .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig).build());
+    AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+        .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+        .store(job);
 
     acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
 
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
index e204e8f..6831869 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
@@ -38,7 +38,7 @@ import org.apache.accumulo.core.iterators.user.RegExFilter;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -77,11 +77,11 @@ public class AccumuloInputFormatTest {
    */
   @Test
   public void testSetIterator() throws IOException {
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
+    InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+        .table("test").auths(Authorizations.EMPTY);
 
     IteratorSetting is = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(is).build());
+    opts.addIterator(is).store(job);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     is.write(new DataOutputStream(baos));
     String iterators = job.get("AccumuloInputFormat.ScanOpts.Iterators");
@@ -90,16 +90,15 @@ public class AccumuloInputFormatTest {
 
   @Test
   public void testAddIterator() {
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
+    InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+        .table("test").auths(Authorizations.EMPTY);
 
     IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
     IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class);
     IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class);
     iter3.addOption("v1", "1");
     iter3.addOption("junk", "\0omg:!\\xyzzy");
-    AccumuloInputFormat.setInfo(job,
-        opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+    opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
 
@@ -143,9 +142,9 @@ public class AccumuloInputFormatTest {
     IteratorSetting iter1 = new IteratorSetting(1, "iter1", WholeRowIterator.class);
     iter1.addOption(key, value);
     // also test if reusing options will create duplicate iterators
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).build());
+    InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+        .table("test").auths(Authorizations.EMPTY);
+    opts.addIterator(iter1).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
     assertEquals(1, list.size());
@@ -155,7 +154,7 @@ public class AccumuloInputFormatTest {
     IteratorSetting iter2 = new IteratorSetting(1, "iter2", WholeRowIterator.class);
     iter2.addOption(key, value);
     iter2.addOption(key + "2", value);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).addIterator(iter2).build());
+    opts.addIterator(iter1).addIterator(iter2).store(job);
     list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
     assertEquals(2, list.size());
     assertEquals(1, list.get(0).getOptions().size());
@@ -173,9 +172,8 @@ public class AccumuloInputFormatTest {
     IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class.getName());
     IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class.getName());
     IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class.getName());
-    AccumuloInputFormat.setInfo(job,
-        InputInfo.builder().clientInfo(clientInfo).table("test").scanAuths(Authorizations.EMPTY)
-            .addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
 
@@ -206,8 +204,8 @@ public class AccumuloInputFormatTest {
 
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, regex, null, null, null, false);
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
-        .scanAuths(Authorizations.EMPTY).addIterator(is).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(is).store(job);
 
     assertEquals(regex,
         InputConfigurator.getIterators(AccumuloInputFormat.class, job).get(0).getName());
@@ -221,8 +219,8 @@ public class AccumuloInputFormatTest {
     cols.add(new IteratorSetting.Column(new Text(""), new Text("bar")));
     cols.add(new IteratorSetting.Column(new Text(""), new Text("")));
     cols.add(new IteratorSetting.Column(new Text("foo"), new Text("")));
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
-        .scanAuths(Authorizations.EMPTY).fetchColumns(cols).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .fetchColumns(cols).store(job);
 
     assertEquals(cols, InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job));
   }
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
index 5811ebc..0f9f777 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
@@ -16,21 +16,16 @@
  */
 package org.apache.accumulo.hadoop.mapred;
 
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.getBatchWriterOptions;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.Test;
@@ -38,14 +33,11 @@ import org.junit.Test;
 public class AccumuloOutputFormatTest {
   @Test
   public void testBWSettings() throws IOException {
-    ClientInfo clientInfo = createMock(ClientInfo.class);
-    AuthenticationToken token = createMock(AuthenticationToken.class);
-    Properties props = createMock(Properties.class);
-    expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes();
-    expect(clientInfo.getProperties()).andReturn(props).anyTimes();
-    replay(clientInfo);
     JobConf job = new JobConf();
 
+    AccumuloClient.ConnectionOptions opts = Accumulo.newClient().to("test", "zk").as("blah",
+        "blah");
+
     // make sure we aren't testing defaults
     final BatchWriterConfig bwDefaults = new BatchWriterConfig();
     assertNotEquals(7654321L, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
@@ -58,13 +50,14 @@ public class AccumuloOutputFormatTest {
     bwConfig.setTimeout(9898989L, TimeUnit.MILLISECONDS);
     bwConfig.setMaxWriteThreads(42);
     bwConfig.setMaxMemory(1123581321L);
-    AccumuloOutputFormat.setInfo(job,
-        OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build());
+    opts.batchWriterConfig(bwConfig);
+    AccumuloOutputFormat.configure().clientInfo(opts.info()).store(job);
 
     AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
       @Override
       public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
-        BatchWriterConfig bwOpts = getBatchWriterOptions(job);
+        BatchWriterConfig bwOpts = OutputConfigurator
+            .getBatchWriterOptions(AccumuloOutputFormat.class, job);
 
         // passive check
         assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS),
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
index ea74826..7aacc14 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
@@ -56,10 +56,9 @@ public class AccumuloFileOutputFormatTest {
         .addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
 
     Job job1 = Job.getInstance();
-    AccumuloFileOutputFormat.setInfo(job1,
-        FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
-            .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig)
-            .summarizers(sc1, sc2).build());
+    AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+        .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+        .summarizers(sc1, sc2).store(job1);
 
     AccumuloConfiguration acuconf = FileOutputConfigurator
         .getAccumuloConfiguration(AccumuloFileOutputFormat.class, job1.getConfiguration());
@@ -88,9 +87,9 @@ public class AccumuloFileOutputFormatTest {
     samplerConfig.addOption("modulus", "100003");
 
     Job job2 = Job.getInstance();
-    AccumuloFileOutputFormat.setInfo(job2,
-        FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
-            .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig).build());
+    AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+        .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+        .store(job2);
 
     acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class,
         job2.getConfiguration());
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
index 4de275b..dfac630 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.iterators.user.RegExFilter;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -64,11 +65,10 @@ public class AccumuloInputFormatTest {
   @Test
   public void testSetIterator() throws IOException {
     Job job = Job.getInstance();
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
 
     IteratorSetting is = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(is).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(is).store(job);
     Configuration conf = job.getConfiguration();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     is.write(new DataOutputStream(baos));
@@ -79,16 +79,14 @@ public class AccumuloInputFormatTest {
   @Test
   public void testAddIterator() throws IOException {
     Job job = Job.getInstance();
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
 
     IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
     IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class);
     IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class);
     iter3.addOption("v1", "1");
     iter3.addOption("junk", "\0omg:!\\xyzzy");
-    AccumuloInputFormat.setInfo(job,
-        opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
         job.getConfiguration());
@@ -134,9 +132,9 @@ public class AccumuloInputFormatTest {
     iter1.addOption(key, value);
     Job job = Job.getInstance();
     // also test if reusing options will create duplicate iterators
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).build());
+    InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+        .table("test").auths(Authorizations.EMPTY);
+    opts.addIterator(iter1).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
         job.getConfiguration());
@@ -147,7 +145,7 @@ public class AccumuloInputFormatTest {
     IteratorSetting iter2 = new IteratorSetting(1, "iter2", WholeRowIterator.class);
     iter2.addOption(key, value);
     iter2.addOption(key + "2", value);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).addIterator(iter2).build());
+    opts.addIterator(iter1).addIterator(iter2).store(job);
     list = InputConfigurator.getIterators(AccumuloInputFormat.class, job.getConfiguration());
     assertEquals(2, list.size());
     assertEquals(1, list.get(0).getOptions().size());
@@ -167,9 +165,8 @@ public class AccumuloInputFormatTest {
     IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class.getName());
     IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class.getName());
     IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class.getName());
-    AccumuloInputFormat.setInfo(job,
-        InputInfo.builder().clientInfo(clientInfo).table("test").scanAuths(Authorizations.EMPTY)
-            .addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
         job.getConfiguration());
@@ -203,8 +200,8 @@ public class AccumuloInputFormatTest {
 
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, regex, null, null, null, false);
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
-        .scanAuths(Authorizations.EMPTY).addIterator(is).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(is).store(job);
 
     assertEquals(regex, InputConfigurator
         .getIterators(AccumuloInputFormat.class, job.getConfiguration()).get(0).getName());
@@ -219,8 +216,8 @@ public class AccumuloInputFormatTest {
     cols.add(new IteratorSetting.Column(new Text(""), new Text("bar")));
     cols.add(new IteratorSetting.Column(new Text(""), new Text("")));
     cols.add(new IteratorSetting.Column(new Text("foo"), new Text("")));
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
-        .scanAuths(Authorizations.EMPTY).fetchColumns(cols).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .fetchColumns(cols).store(job);
 
     assertEquals(cols,
         InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job.getConfiguration()));
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
index e698b3a..52aace9 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
@@ -16,20 +16,16 @@
  */
 package org.apache.accumulo.hadoop.mapreduce;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.junit.Test;
@@ -38,14 +34,11 @@ public class AccumuloOutputFormatTest {
 
   @Test
   public void testBWSettings() throws IOException {
-    ClientInfo clientInfo = createMock(ClientInfo.class);
-    AuthenticationToken token = createMock(AuthenticationToken.class);
-    Properties props = createMock(Properties.class);
-    expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes();
-    expect(clientInfo.getProperties()).andReturn(props).anyTimes();
-    replay(clientInfo);
     Job job = Job.getInstance();
 
+    AccumuloClient.ConnectionOptions opts = Accumulo.newClient().to("test", "zk").as("blah",
+        "blah");
+
     // make sure we aren't testing defaults
     final BatchWriterConfig bwDefaults = new BatchWriterConfig();
     assertNotEquals(7654321L, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
@@ -58,13 +51,14 @@ public class AccumuloOutputFormatTest {
     bwConfig.setTimeout(9898989L, TimeUnit.MILLISECONDS);
     bwConfig.setMaxWriteThreads(42);
     bwConfig.setMaxMemory(1123581321L);
-    AccumuloOutputFormat.setInfo(job,
-        OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build());
+    opts.batchWriterConfig(bwConfig);
+    AccumuloOutputFormat.configure().clientInfo(opts.info()).store(job);
 
     AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
       @Override
       public void checkOutputSpecs(JobContext job) throws IOException {
-        BatchWriterConfig bwOpts = AccumuloOutputFormatImpl.getBatchWriterOptions(job);
+        BatchWriterConfig bwOpts = OutputConfigurator
+            .getBatchWriterOptions(AccumuloOutputFormat.class, job.getConfiguration());
 
         // passive check
         assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS),
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
index de722ef..ba0fb85 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -205,6 +206,11 @@ public class ConfigurableMacBase extends AccumuloITBase {
         .as("root", ROOT_PASSWORD).info();
   }
 
+  protected ClientInfo getClientInfo(BatchWriterConfig bwConfig) {
+    return Accumulo.newClient().to(getCluster().getInstanceName(), getCluster().getZooKeepers())
+        .as("root", ROOT_PASSWORD).batchWriterConfig(bwConfig).info();
+  }
+
   protected ServerContext getServerContext() {
     return getCluster().getServerContext();
   }


Mime
View raw message