accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [accumulo] branch master updated: Revert core/mapred* serialization (#892)
Date Fri, 11 Jan 2019 15:03:23 GMT
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new b78f8e9  Revert core/mapred* serialization (#892)
b78f8e9 is described below

commit b78f8e90789c824fa1802fcfee9c4193dc572606
Author: Christopher Tubbs <ctubbsii@apache.org>
AuthorDate: Fri Jan 11 10:03:18 2019 -0500

    Revert core/mapred* serialization (#892)
    
    * Fix broken core/mapred* ITs
    * Revert serialization to previous (1.9) serialization in core
    * Make use of stored fields in input splits before reading job
    configuration for constructing client
    * Use ConnectorImpl's ClientContext in core/mapred
    * Revert unnecessary closing of ClientContext from ConnectorImpl
    * Revert unnecessary "simplification" of Configurators to manage
    deprecated types (remove unnecessary deprecation of internal
    configurator classes)
    * Slight simplifications to ITs to ease debugging
    * Check failure state when core/mapred ITs fail
---
 .../org/apache/accumulo/core/cli/ClientOpts.java   |   2 +-
 .../core/client/mapred/AbstractInputFormat.java    | 337 ++++++++---------
 .../client/mapred/AccumuloFileOutputFormat.java    |  22 +-
 .../core/client/mapred/AccumuloOutputFormat.java   |  58 ++-
 .../core/client/mapred/InputFormatBase.java        |  43 +--
 .../core/client/mapreduce/AbstractInputFormat.java | 407 ++++++++++-----------
 .../client/mapreduce/AccumuloFileOutputFormat.java |  21 +-
 .../client/mapreduce/AccumuloOutputFormat.java     |  55 ++-
 .../core/client/mapreduce/InputFormatBase.java     |  43 +--
 .../accumulo/core/clientImpl/ConnectorImpl.java    |   3 +-
 .../clientImpl/mapreduce/lib/ConfiguratorBase.java | 306 +++++++++-------
 .../mapreduce/lib/FileOutputConfigurator.java      |   2 -
 .../mapreduce/lib/InputConfigurator.java           | 178 +++++----
 .../mapreduce/lib/OutputConfigurator.java          |   5 -
 .../mapreduce/lib/ConfiguratorBaseTest.java        |  45 +--
 .../apache/accumulo/test/mapred/TokenFileIT.java   |  44 ++-
 .../apache/accumulo/test/mapreduce/RowHash.java    |  22 +-
 .../accumulo/test/mapreduce/TokenFileIT.java       |  31 +-
 18 files changed, 803 insertions(+), 821 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
index dbc98ba..559d70b 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
@@ -103,7 +103,7 @@ public class ClientOpts extends Help {
 
   @Parameter(names = {"-z", "--keepers"},
       description = "Comma separated list of zookeeper hosts (host:port,host:port)")
-  private String zookeepers = null;
+  protected String zookeepers = null;
 
   @Parameter(names = {"-i", "--instance"}, description = "The name of the accumulo instance")
   protected String instance = null;
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index 60f824e..7246d24 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -30,10 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchScanner;
@@ -54,13 +51,13 @@ import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.clientImpl.ClientInfo;
 import org.apache.accumulo.core.clientImpl.DelegationTokenImpl;
 import org.apache.accumulo.core.clientImpl.OfflineScanner;
 import org.apache.accumulo.core.clientImpl.ScannerImpl;
 import org.apache.accumulo.core.clientImpl.Table;
 import org.apache.accumulo.core.clientImpl.Tables;
 import org.apache.accumulo.core.clientImpl.TabletLocator;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -88,10 +85,6 @@ import org.apache.log4j.Logger;
 @Deprecated
 public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
 
-  // static wrapper class to make references to deprecated configurator easier
-  private static class Configurator
-      extends org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator {}
-
   protected static final Class<?> CLASS = AccumuloInputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
 
@@ -105,7 +98,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @since 1.8.0
    */
   public static void setClassLoaderContext(JobConf job, String context) {
-    Configurator.setClassLoaderContext(CLASS, job, context);
+    InputConfigurator.setClassLoaderContext(CLASS, job, context);
   }
 
   /**
@@ -117,7 +110,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @since 1.8.0
    */
   public static String getClassLoaderContext(JobConf job) {
-    return Configurator.getClassLoaderContext(CLASS, job);
+    return InputConfigurator.getClassLoaderContext(CLASS, job);
   }
 
   /**
@@ -144,8 +137,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
       throws AccumuloSecurityException {
     if (token instanceof KerberosToken) {
       log.info("Received KerberosToken, attempting to fetch DelegationToken");
-      try (AccumuloClient client = Accumulo.newClient()
-          .from(Configurator.getClientProperties(CLASS, job)).as(principal, token).build()) {
+      try {
+        ClientContext client = InputConfigurator.client(CLASS, job);
         token = client.securityOperations().getDelegationToken(new DelegationTokenConfig());
       } catch (Exception e) {
         log.warn("Failed to automatically obtain DelegationToken, Mappers/Reducers will likely"
@@ -166,7 +159,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
       job.getCredentials().addToken(hadoopToken.getService(), hadoopToken);
     }
 
-    Configurator.setConnectorInfo(CLASS, job, principal, token);
+    InputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
 
   /**
@@ -186,7 +179,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    */
   public static void setConnectorInfo(JobConf job, String principal, String tokenFile)
       throws AccumuloSecurityException {
-    Configurator.setConnectorInfo(CLASS, job, principal, tokenFile);
+    InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
   }
 
   /**
@@ -199,7 +192,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
    */
   protected static Boolean isConnectorInfoSet(JobConf job) {
-    return Configurator.isConnectorInfoSet(CLASS, job);
+    return InputConfigurator.isConnectorInfoSet(CLASS, job);
   }
 
   /**
@@ -212,7 +205,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
    */
   protected static String getPrincipal(JobConf job) {
-    return Configurator.getPrincipal(CLASS, job);
+    return InputConfigurator.getPrincipal(CLASS, job);
   }
 
   /**
@@ -227,8 +220,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @see #setConnectorInfo(JobConf, String, String)
    */
   protected static AuthenticationToken getAuthenticationToken(JobConf job) {
-    AuthenticationToken token = Configurator.getAuthenticationToken(CLASS, job);
-    return Configurator.unwrapAuthenticationToken(job, token);
+    AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS, job);
+    return InputConfigurator.unwrapAuthenticationToken(job, token);
   }
 
   /**
@@ -262,7 +255,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    */
   public static void setZooKeeperInstance(JobConf job,
       org.apache.accumulo.core.client.ClientConfiguration clientConfig) {
-    Configurator.setZooKeeperInstance(CLASS, job, clientConfig);
+    InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig);
   }
 
   /**
@@ -276,7 +269,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @see #setZooKeeperInstance(JobConf, org.apache.accumulo.core.client.ClientConfiguration)
    */
   protected static org.apache.accumulo.core.client.Instance getInstance(JobConf job) {
-    return Configurator.getInstance(CLASS, job);
+    return InputConfigurator.getInstance(CLASS, job);
   }
 
   /**
@@ -289,7 +282,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setLogLevel(JobConf job, Level level) {
-    Configurator.setLogLevel(CLASS, job, level);
+    InputConfigurator.setLogLevel(CLASS, job, level);
   }
 
   /**
@@ -302,7 +295,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @see #setLogLevel(JobConf, Level)
    */
   protected static Level getLogLevel(JobConf job) {
-    return Configurator.getLogLevel(CLASS, job);
+    return InputConfigurator.getLogLevel(CLASS, job);
   }
 
   /**
@@ -316,7 +309,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setScanAuthorizations(JobConf job, Authorizations auths) {
-    Configurator.setScanAuthorizations(CLASS, job, auths);
+    InputConfigurator.setScanAuthorizations(CLASS, job, auths);
   }
 
   /**
@@ -329,7 +322,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @see #setScanAuthorizations(JobConf, Authorizations)
    */
   protected static Authorizations getScanAuthorizations(JobConf job) {
-    return Configurator.getScanAuthorizations(CLASS, job);
+    return InputConfigurator.getScanAuthorizations(CLASS, job);
   }
 
   /**
@@ -342,7 +335,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    */
   protected static org.apache.accumulo.core.client.ClientConfiguration getClientConfiguration(
       JobConf job) {
-    return Configurator.getClientConfiguration(CLASS, job);
+    return InputConfigurator.getClientConfiguration(CLASS, job);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -357,7 +350,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @since 1.5.0
    */
   protected static void validateOptions(JobConf job) throws IOException {
-    Configurator.validatePermissions(CLASS, job);
+    InputConfigurator.validatePermissions(CLASS, job);
   }
 
   /**
@@ -372,7 +365,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    */
   public static Map<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> getInputTableConfigs(
       JobConf job) {
-    return Configurator.getInputTableConfigs(CLASS, job);
+    return InputConfigurator.getInputTableConfigs(CLASS, job);
   }
 
   /**
@@ -392,7 +385,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    */
   public static org.apache.accumulo.core.client.mapreduce.InputTableConfig getInputTableConfig(
       JobConf job, String tableName) {
-    return Configurator.getInputTableConfig(CLASS, job, tableName);
+    return InputConfigurator.getInputTableConfig(CLASS, job, tableName);
   }
 
   /**
@@ -407,7 +400,6 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * </ul>
    */
   protected abstract static class AbstractRecordReader<K,V> implements RecordReader<K,V> {
-    private ClientContext context = null;
     protected long numKeysRead;
     protected Iterator<Map.Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
@@ -479,10 +471,11 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
       baseSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) inSplit;
       log.debug("Initializing input split: " + baseSplit);
 
-      if (context == null) {
-        context = new ClientContext(Configurator.getClientProperties(CLASS, job));
+      Authorizations authorizations = baseSplit.getAuths();
+      if (null == authorizations) {
+        authorizations = getScanAuthorizations(job);
       }
-      Authorizations authorizations = getScanAuthorizations(job);
+
       String classLoaderContext = getClassLoaderContext(job);
       String table = baseSplit.getTableName();
 
@@ -491,7 +484,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
       org.apache.accumulo.core.client.mapreduce.InputTableConfig tableConfig = getInputTableConfig(
           job, baseSplit.getTableName());
 
-      log.debug("Created client with user: " + context.whoami());
+      ClientContext client = InputConfigurator.client(CLASS, baseSplit, job);
+
+      log.debug("Created client with user: " + client.whoami());
       log.debug("Creating scanner for table: " + table);
       log.debug("Authorizations are: " + authorizations);
 
@@ -503,7 +498,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
           // Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit
           // will not span tablets
           int scanThreads = 1;
-          scanner = context.createBatchScanner(baseSplit.getTableName(), authorizations,
+          scanner = client.createBatchScanner(baseSplit.getTableName(), authorizations,
               scanThreads);
           setupIterators(job, scanner, baseSplit.getTableName(), baseSplit);
           if (classLoaderContext != null) {
@@ -537,10 +532,10 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
 
         try {
           if (isOffline) {
-            scanner = new OfflineScanner(context, Table.ID.of(baseSplit.getTableId()),
+            scanner = new OfflineScanner(client, Table.ID.of(baseSplit.getTableId()),
                 authorizations);
           } else {
-            scanner = new ScannerImpl(context, Table.ID.of(baseSplit.getTableId()), authorizations);
+            scanner = new ScannerImpl(client, Table.ID.of(baseSplit.getTableId()), authorizations);
           }
           if (isIsolated) {
             log.info("Creating isolated scanner");
@@ -592,26 +587,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
 
     @Override
     public void close() {
-      // close several objects, aggregating any exceptions thrown
-      Stream.of(scannerBase, context).flatMap(o -> {
-        try {
-          if (o != null) {
-            o.close();
-          }
-          return null;
-        } catch (Exception e) {
-          return Stream.of(e);
-        }
-      }).reduce((e1, e2) -> {
-        e1.addSuppressed(e2);
-        return e1;
-      }).ifPresent(e -> {
-        if (e instanceof RuntimeException) {
-          throw (RuntimeException) e;
-        } else {
-          throw new RuntimeException(e);
-        }
-      });
+      if (scannerBase != null) {
+        scannerBase.close();
+      }
     }
 
     @Override
@@ -633,8 +611,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, Table.ID tableId,
       List<Range> ranges)
       throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-    return Configurator.binOffline(tableId, ranges,
-        ClientInfo.from(Configurator.getClientProperties(CLASS, job)));
+    return InputConfigurator.binOffline(tableId, ranges, InputConfigurator.client(CLASS, job));
   }
 
   /**
@@ -656,146 +633,152 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
     LinkedList<InputSplit> splits = new LinkedList<>();
     Map<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> tableConfigs = getInputTableConfigs(
         job);
+
     for (Map.Entry<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> tableConfigEntry : tableConfigs
         .entrySet()) {
+
       String tableName = tableConfigEntry.getKey();
       org.apache.accumulo.core.client.mapreduce.InputTableConfig tableConfig = tableConfigEntry
           .getValue();
 
-      try (
-          ClientContext context = new ClientContext(Configurator.getClientProperties(CLASS, job))) {
-        Table.ID tableId;
-        // resolve table name to id once, and use id from this point forward
-        try {
-          tableId = Tables.getTableId(context, tableName);
-        } catch (TableNotFoundException e) {
-          throw new IOException(e);
-        }
+      ClientContext client;
+      try {
+        client = InputConfigurator.client(CLASS, job);
+      } catch (AccumuloException | AccumuloSecurityException e) {
+        throw new IOException(e);
+      }
 
-        boolean batchScan = Configurator.isBatchScan(CLASS, job);
-        boolean supportBatchScan = !(tableConfig.isOfflineScan()
-            || tableConfig.shouldUseIsolatedScanners() || tableConfig.shouldUseLocalIterators());
-        if (batchScan && !supportBatchScan)
-          throw new IllegalArgumentException("BatchScanner optimization not available for offline"
-              + " scan, isolated, or local iterators");
-
-        boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
-        if (batchScan && !autoAdjust)
-          throw new IllegalArgumentException(
-              "AutoAdjustRanges must be enabled when using BatchScanner optimization");
-
-        List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges())
-            : tableConfig.getRanges();
-        if (ranges.isEmpty()) {
-          ranges = new ArrayList<>(1);
-          ranges.add(new Range());
-        }
+      Table.ID tableId;
+      // resolve table name to id once, and use id from this point forward
+      try {
+        tableId = Tables.getTableId(client, tableName);
+      } catch (TableNotFoundException e) {
+        throw new IOException(e);
+      }
 
-        // get the metadata information for these ranges
-        Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
-        TabletLocator tl;
-        try {
-          if (tableConfig.isOfflineScan()) {
+      boolean batchScan = InputConfigurator.isBatchScan(CLASS, job);
+      boolean supportBatchScan = !(tableConfig.isOfflineScan()
+          || tableConfig.shouldUseIsolatedScanners() || tableConfig.shouldUseLocalIterators());
+      if (batchScan && !supportBatchScan)
+        throw new IllegalArgumentException("BatchScanner optimization not available for offline"
+            + " scan, isolated, or local iterators");
+
+      boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+      if (batchScan && !autoAdjust)
+        throw new IllegalArgumentException(
+            "AutoAdjustRanges must be enabled when using BatchScanner optimization");
+
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges())
+          : tableConfig.getRanges();
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<>(1);
+        ranges.add(new Range());
+      }
+
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
+      TabletLocator tl;
+      try {
+        if (tableConfig.isOfflineScan()) {
+          binnedRanges = binOfflineTable(job, tableId, ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            // sleep randomly between 100 and 200 ms
+            sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
             binnedRanges = binOfflineTable(job, tableId, ranges);
-            while (binnedRanges == null) {
-              // Some tablets were still online, try again
-              // sleep randomly between 100 and 200 ms
-              sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
-              binnedRanges = binOfflineTable(job, tableId, ranges);
-            }
-          } else {
-            tl = TabletLocator.getLocator(context, tableId);
-            // its possible that the cache could contain complete, but old information about a
-            // tables tablets... so clear it
+          }
+        } else {
+          tl = TabletLocator.getLocator(client, tableId);
+          // its possible that the cache could contain complete, but old information about a
+          // tables tablets... so clear it
+          tl.invalidateCache();
+
+          while (!tl.binRanges(client, ranges, binnedRanges).isEmpty()) {
+            String tableIdStr = tableId.canonicalID();
+            if (!Tables.exists(client, tableId))
+              throw new TableDeletedException(tableIdStr);
+            if (Tables.getTableState(client, tableId) == TableState.OFFLINE)
+              throw new TableOfflineException(Tables.getTableOfflineMsg(client, tableId));
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            // sleep randomly between 100 and 200 ms
+            sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
             tl.invalidateCache();
-
-            while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
-              String tableIdStr = tableId.canonicalID();
-              if (!Tables.exists(context, tableId))
-                throw new TableDeletedException(tableIdStr);
-              if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
-                throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
-              binnedRanges.clear();
-              log.warn("Unable to locate bins for specified ranges. Retrying.");
-              // sleep randomly between 100 and 200 ms
-              sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
-              tl.invalidateCache();
-            }
           }
-        } catch (Exception e) {
-          throw new IOException(e);
         }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
 
-        HashMap<Range,ArrayList<String>> splitsToAdd = null;
-
-        if (!autoAdjust)
-          splitsToAdd = new HashMap<>();
-
-        HashMap<String,String> hostNameCache = new HashMap<>();
-        for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-          String ip = tserverBin.getKey().split(":", 2)[0];
-          String location = hostNameCache.get(ip);
-          if (location == null) {
-            InetAddress inetAddress = InetAddress.getByName(ip);
-            location = inetAddress.getCanonicalHostName();
-            hostNameCache.put(ip, location);
-          }
-          for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-            Range ke = extentRanges.getKey().toDataRange();
-            if (batchScan) {
-              // group ranges by tablet to be read by a BatchScanner
-              ArrayList<Range> clippedRanges = new ArrayList<>();
-              for (Range r : extentRanges.getValue())
-                clippedRanges.add(ke.clip(r));
-
-              org.apache.accumulo.core.clientImpl.mapred.BatchInputSplit split = new org.apache.accumulo.core.clientImpl.mapred.BatchInputSplit(
-                  tableName, tableId, clippedRanges, new String[] {location});
-              org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split,
-                  tableConfig, logLevel);
-
-              splits.add(split);
-            } else {
-              // not grouping by tablet
-              for (Range r : extentRanges.getValue()) {
-                if (autoAdjust) {
-                  // divide ranges into smaller ranges, based on the tablets
-                  RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(),
-                      ke.clip(r), new String[] {location});
-                  org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split,
-                      tableConfig, logLevel);
-                  split.setOffline(tableConfig.isOfflineScan());
-                  split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
-                  split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
-
-                  splits.add(split);
-                } else {
-                  // don't divide ranges
-                  ArrayList<String> locations = splitsToAdd.get(r);
-                  if (locations == null)
-                    locations = new ArrayList<>(1);
-                  locations.add(location);
-                  splitsToAdd.put(r, locations);
-                }
-              }
-            }
-          }
+      // all of this code will add either range per each locations or split ranges and add
+      // range-location split
+      // Map from Range to Array of Locations, we only use this if we're don't split
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<>();
+
+      HashMap<String,String> hostNameCache = new HashMap<>();
+      for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getCanonicalHostName();
+          hostNameCache.put(ip, location);
         }
-
-        if (!autoAdjust)
-          for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
-            RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(),
-                entry.getKey(), entry.getValue().toArray(new String[0]));
+        for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          if (batchScan) {
+            // group ranges by tablet to be read by a BatchScanner
+            ArrayList<Range> clippedRanges = new ArrayList<>();
+            for (Range r : extentRanges.getValue())
+              clippedRanges.add(ke.clip(r));
+            org.apache.accumulo.core.clientImpl.mapred.BatchInputSplit split = new org.apache.accumulo.core.clientImpl.mapred.BatchInputSplit(
+                tableName, tableId, clippedRanges, new String[] {location});
             org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split, tableConfig,
                 logLevel);
-            split.setOffline(tableConfig.isOfflineScan());
-            split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
-            split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
 
             splits.add(split);
+          } else {
+            // not grouping by tablet
+            for (Range r : extentRanges.getValue()) {
+              if (autoAdjust) {
+                // divide ranges into smaller ranges, based on the tablets
+                RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(),
+                    ke.clip(r), new String[] {location});
+                org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split,
+                    tableConfig, logLevel);
+                split.setOffline(tableConfig.isOfflineScan());
+                split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+                split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+                splits.add(split);
+              } else {
+                // don't divide ranges
+                ArrayList<String> locations = splitsToAdd.get(r);
+                if (locations == null)
+                  locations = new ArrayList<>(1);
+                locations.add(location);
+                splitsToAdd.put(r, locations);
+              }
+            }
           }
+        }
       }
-    }
 
+      if (!autoAdjust)
+        for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
+          RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(),
+              entry.getKey(), entry.getValue().toArray(new String[0]));
+          org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split, tableConfig,
+              logLevel);
+          split.setOffline(tableConfig.isOfflineScan());
+          split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+          split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+
+          splits.add(split);
+        }
+    }
     return splits.toArray(new InputSplit[splits.size()]);
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index 2074c8c..79ebeeb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.FileOutputConfigurator;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -54,10 +55,6 @@ import org.apache.log4j.Logger;
 @Deprecated
 public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
 
-  // static wrapper class to make references to deprecated configurator easier
-  private static class Configurator
-      extends org.apache.accumulo.core.clientImpl.mapreduce.lib.FileOutputConfigurator {}
-
   private static final Class<?> CLASS = AccumuloFileOutputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
 
@@ -72,7 +69,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setCompressionType(JobConf job, String compressionType) {
-    Configurator.setCompressionType(CLASS, job, compressionType);
+    FileOutputConfigurator.setCompressionType(CLASS, job, compressionType);
   }
 
   /**
@@ -91,7 +88,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setDataBlockSize(JobConf job, long dataBlockSize) {
-    Configurator.setDataBlockSize(CLASS, job, dataBlockSize);
+    FileOutputConfigurator.setDataBlockSize(CLASS, job, dataBlockSize);
   }
 
   /**
@@ -105,7 +102,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setFileBlockSize(JobConf job, long fileBlockSize) {
-    Configurator.setFileBlockSize(CLASS, job, fileBlockSize);
+    FileOutputConfigurator.setFileBlockSize(CLASS, job, fileBlockSize);
   }
 
   /**
@@ -120,7 +117,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setIndexBlockSize(JobConf job, long indexBlockSize) {
-    Configurator.setIndexBlockSize(CLASS, job, indexBlockSize);
+    FileOutputConfigurator.setIndexBlockSize(CLASS, job, indexBlockSize);
   }
 
   /**
@@ -134,7 +131,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setReplication(JobConf job, int replication) {
-    Configurator.setReplication(CLASS, job, replication);
+    FileOutputConfigurator.setReplication(CLASS, job, replication);
   }
 
   /**
@@ -149,7 +146,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    */
 
   public static void setSampler(JobConf job, SamplerConfiguration samplerConfig) {
-    Configurator.setSampler(CLASS, job, samplerConfig);
+    FileOutputConfigurator.setSampler(CLASS, job, samplerConfig);
   }
 
   @Override
@@ -157,12 +154,13 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
       Progressable progress) throws IOException {
     // get the path of the temporary output file
     final Configuration conf = job;
-    final AccumuloConfiguration acuConf = Configurator.getAccumuloConfiguration(CLASS, job);
+    final AccumuloConfiguration acuConf = FileOutputConfigurator.getAccumuloConfiguration(CLASS,
+        job);
 
     final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
     final Path file = new Path(getWorkOutputPath(job),
         getUniqueName(job, "part") + "." + extension);
-    final int visCacheSize = Configurator.getVisibilityCacheSize(conf);
+    final int visCacheSize = FileOutputConfigurator.getVisibilityCacheSize(conf);
 
     return new RecordWriter<Key,Value>() {
       RFileWriter out = null;
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
index 921f291..b4afe9c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -41,7 +40,9 @@ import org.apache.accumulo.core.client.security.tokens.DelegationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.DelegationTokenImpl;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.OutputConfigurator;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TabletId;
@@ -78,10 +79,6 @@ import org.apache.log4j.Logger;
 @Deprecated
 public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
 
-  // static wrapper class to make references to deprecated configurator easier
-  private static class Configurator
-      extends org.apache.accumulo.core.clientImpl.mapreduce.lib.OutputConfigurator {}
-
   private static final Class<?> CLASS = AccumuloOutputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
 
@@ -111,9 +108,8 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
     if (token instanceof KerberosToken) {
       log.info("Received KerberosToken, attempting to fetch DelegationToken");
       try {
-        org.apache.accumulo.core.client.Instance instance = getInstance(job);
-        org.apache.accumulo.core.client.Connector conn = instance.getConnector(principal, token);
-        token = conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+        ClientContext client = OutputConfigurator.client(CLASS, job);
+        token = client.securityOperations().getDelegationToken(new DelegationTokenConfig());
       } catch (Exception e) {
         log.warn("Failed to automatically obtain DelegationToken, "
             + "Mappers/Reducers will likely fail to communicate with Accumulo", e);
@@ -133,7 +129,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
       job.getCredentials().addToken(hadoopToken.getService(), hadoopToken);
     }
 
-    Configurator.setConnectorInfo(CLASS, job, principal, token);
+    OutputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
 
   /**
@@ -154,7 +150,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    */
   public static void setConnectorInfo(JobConf job, String principal, String tokenFile)
       throws AccumuloSecurityException {
-    Configurator.setConnectorInfo(CLASS, job, principal, tokenFile);
+    OutputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
   }
 
   /**
@@ -167,7 +163,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
    */
   protected static Boolean isConnectorInfoSet(JobConf job) {
-    return Configurator.isConnectorInfoSet(CLASS, job);
+    return OutputConfigurator.isConnectorInfoSet(CLASS, job);
   }
 
   /**
@@ -180,7 +176,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
    */
   protected static String getPrincipal(JobConf job) {
-    return Configurator.getPrincipal(CLASS, job);
+    return OutputConfigurator.getPrincipal(CLASS, job);
   }
 
   /**
@@ -217,8 +213,8 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setConnectorInfo(JobConf, String, String)
    */
   protected static AuthenticationToken getAuthenticationToken(JobConf job) {
-    AuthenticationToken token = Configurator.getAuthenticationToken(CLASS, job);
-    return Configurator.unwrapAuthenticationToken(job, token);
+    AuthenticationToken token = OutputConfigurator.getAuthenticationToken(CLASS, job);
+    return OutputConfigurator.unwrapAuthenticationToken(job, token);
   }
 
   /**
@@ -253,7 +249,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    */
   public static void setZooKeeperInstance(JobConf job,
       org.apache.accumulo.core.client.ClientConfiguration clientConfig) {
-    Configurator.setZooKeeperInstance(CLASS, job, clientConfig);
+    OutputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig);
   }
 
   /**
@@ -267,7 +263,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setZooKeeperInstance(JobConf, org.apache.accumulo.core.client.ClientConfiguration)
    */
   protected static org.apache.accumulo.core.client.Instance getInstance(JobConf job) {
-    return Configurator.getInstance(CLASS, job);
+    return OutputConfigurator.getInstance(CLASS, job);
   }
 
   /**
@@ -280,7 +276,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setLogLevel(JobConf job, Level level) {
-    Configurator.setLogLevel(CLASS, job, level);
+    OutputConfigurator.setLogLevel(CLASS, job, level);
   }
 
   /**
@@ -293,7 +289,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setLogLevel(JobConf, Level)
    */
   protected static Level getLogLevel(JobConf job) {
-    return Configurator.getLogLevel(CLASS, job);
+    return OutputConfigurator.getLogLevel(CLASS, job);
   }
 
   /**
@@ -307,7 +303,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setDefaultTableName(JobConf job, String tableName) {
-    Configurator.setDefaultTableName(CLASS, job, tableName);
+    OutputConfigurator.setDefaultTableName(CLASS, job, tableName);
   }
 
   /**
@@ -320,7 +316,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setDefaultTableName(JobConf, String)
    */
   protected static String getDefaultTableName(JobConf job) {
-    return Configurator.getDefaultTableName(CLASS, job);
+    return OutputConfigurator.getDefaultTableName(CLASS, job);
   }
 
   /**
@@ -335,7 +331,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setBatchWriterOptions(JobConf job, BatchWriterConfig bwConfig) {
-    Configurator.setBatchWriterOptions(CLASS, job, bwConfig);
+    OutputConfigurator.setBatchWriterOptions(CLASS, job, bwConfig);
   }
 
   /**
@@ -348,7 +344,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setBatchWriterOptions(JobConf, BatchWriterConfig)
    */
   protected static BatchWriterConfig getBatchWriterOptions(JobConf job) {
-    return Configurator.getBatchWriterOptions(CLASS, job);
+    return OutputConfigurator.getBatchWriterOptions(CLASS, job);
   }
 
   /**
@@ -365,7 +361,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setCreateTables(JobConf job, boolean enableFeature) {
-    Configurator.setCreateTables(CLASS, job, enableFeature);
+    OutputConfigurator.setCreateTables(CLASS, job, enableFeature);
   }
 
   /**
@@ -378,7 +374,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setCreateTables(JobConf, boolean)
    */
   protected static Boolean canCreateTables(JobConf job) {
-    return Configurator.canCreateTables(CLASS, job);
+    return OutputConfigurator.canCreateTables(CLASS, job);
   }
 
   /**
@@ -395,7 +391,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setSimulationMode(JobConf job, boolean enableFeature) {
-    Configurator.setSimulationMode(CLASS, job, enableFeature);
+    OutputConfigurator.setSimulationMode(CLASS, job, enableFeature);
   }
 
   /**
@@ -408,7 +404,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setSimulationMode(JobConf, boolean)
    */
   protected static Boolean getSimulationMode(JobConf job) {
-    return Configurator.getSimulationMode(CLASS, job);
+    return OutputConfigurator.getSimulationMode(CLASS, job);
   }
 
   /**
@@ -427,7 +423,8 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
 
     private AccumuloClient client;
 
-    protected AccumuloRecordWriter(JobConf job) {
+    protected AccumuloRecordWriter(JobConf job)
+        throws AccumuloException, AccumuloSecurityException {
       Level l = getLogLevel(job);
       if (l != null)
         log.setLevel(getLogLevel(job));
@@ -443,8 +440,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
       this.defaultTableName = (tname == null) ? null : new Text(tname);
 
       if (!simulate) {
-        this.client = Accumulo.newClient().from(Configurator.getClientProperties(CLASS, job))
-            .build();
+        this.client = OutputConfigurator.client(CLASS, job);
         mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(job));
       }
     }
@@ -581,8 +577,8 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
     if (!isConnectorInfoSet(job))
       throw new IOException("Connector info has not been set.");
-    try (AccumuloClient c = Accumulo.newClient().from(Configurator.getClientProperties(CLASS, job))
-        .build()) {
+    try {
+      AccumuloClient c = OutputConfigurator.client(CLASS, job);
       String principal = getPrincipal(job);
       AuthenticationToken token = getAuthenticationToken(job);
       if (!c.securityOperations().authenticateUser(principal, token))
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
index fb9c179..4b328e0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -59,10 +60,6 @@ import org.apache.hadoop.mapred.Reporter;
 @Deprecated
 public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
 
-  // static wrapper class to make references to deprecated configurator easier
-  private static class Configurator
-      extends org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator {}
-
   /**
    * Sets the name of the input table, over which this job will scan.
    *
@@ -73,7 +70,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setInputTableName(JobConf job, String tableName) {
-    Configurator.setInputTableName(CLASS, job, tableName);
+    InputConfigurator.setInputTableName(CLASS, job, tableName);
   }
 
   /**
@@ -86,7 +83,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setInputTableName(JobConf, String)
    */
   protected static String getInputTableName(JobConf job) {
-    return Configurator.getInputTableName(CLASS, job);
+    return InputConfigurator.getInputTableName(CLASS, job);
   }
 
   /**
@@ -100,7 +97,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setRanges(JobConf job, Collection<Range> ranges) {
-    Configurator.setRanges(CLASS, job, ranges);
+    InputConfigurator.setRanges(CLASS, job, ranges);
   }
 
   /**
@@ -115,7 +112,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setRanges(JobConf, Collection)
    */
   protected static List<Range> getRanges(JobConf job) throws IOException {
-    return Configurator.getRanges(CLASS, job);
+    return InputConfigurator.getRanges(CLASS, job);
   }
 
   /**
@@ -131,7 +128,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    */
   public static void fetchColumns(JobConf job,
       Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    Configurator.fetchColumns(CLASS, job, columnFamilyColumnQualifierPairs);
+    InputConfigurator.fetchColumns(CLASS, job, columnFamilyColumnQualifierPairs);
   }
 
   /**
@@ -144,7 +141,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #fetchColumns(JobConf, Collection)
    */
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobConf job) {
-    return Configurator.getFetchedColumns(CLASS, job);
+    return InputConfigurator.getFetchedColumns(CLASS, job);
   }
 
   /**
@@ -157,7 +154,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void addIterator(JobConf job, IteratorSetting cfg) {
-    Configurator.addIterator(CLASS, job, cfg);
+    InputConfigurator.addIterator(CLASS, job, cfg);
   }
 
   /**
@@ -171,7 +168,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #addIterator(JobConf, IteratorSetting)
    */
   protected static List<IteratorSetting> getIterators(JobConf job) {
-    return Configurator.getIterators(CLASS, job);
+    return InputConfigurator.getIterators(CLASS, job);
   }
 
   /**
@@ -190,7 +187,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setAutoAdjustRanges(JobConf job, boolean enableFeature) {
-    Configurator.setAutoAdjustRanges(CLASS, job, enableFeature);
+    InputConfigurator.setAutoAdjustRanges(CLASS, job, enableFeature);
   }
 
   /**
@@ -204,7 +201,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setAutoAdjustRanges(JobConf, boolean)
    */
   protected static boolean getAutoAdjustRanges(JobConf job) {
-    return Configurator.getAutoAdjustRanges(CLASS, job);
+    return InputConfigurator.getAutoAdjustRanges(CLASS, job);
   }
 
   /**
@@ -220,7 +217,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setScanIsolation(JobConf job, boolean enableFeature) {
-    Configurator.setScanIsolation(CLASS, job, enableFeature);
+    InputConfigurator.setScanIsolation(CLASS, job, enableFeature);
   }
 
   /**
@@ -233,7 +230,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setScanIsolation(JobConf, boolean)
    */
   protected static boolean isIsolated(JobConf job) {
-    return Configurator.isIsolated(CLASS, job);
+    return InputConfigurator.isIsolated(CLASS, job);
   }
 
   /**
@@ -252,7 +249,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setLocalIterators(JobConf job, boolean enableFeature) {
-    Configurator.setLocalIterators(CLASS, job, enableFeature);
+    InputConfigurator.setLocalIterators(CLASS, job, enableFeature);
   }
 
   /**
@@ -265,7 +262,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setLocalIterators(JobConf, boolean)
    */
   protected static boolean usesLocalIterators(JobConf job) {
-    return Configurator.usesLocalIterators(CLASS, job);
+    return InputConfigurator.usesLocalIterators(CLASS, job);
   }
 
   /**
@@ -304,7 +301,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setOfflineTableScan(JobConf job, boolean enableFeature) {
-    Configurator.setOfflineTableScan(CLASS, job, enableFeature);
+    InputConfigurator.setOfflineTableScan(CLASS, job, enableFeature);
   }
 
   /**
@@ -317,7 +314,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setOfflineTableScan(JobConf, boolean)
    */
   protected static boolean isOfflineScan(JobConf job) {
-    return Configurator.isOfflineScan(CLASS, job);
+    return InputConfigurator.isOfflineScan(CLASS, job);
   }
 
   /**
@@ -348,7 +345,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.7.0
    */
   public static void setBatchScan(JobConf job, boolean enableFeature) {
-    Configurator.setBatchScan(CLASS, job, enableFeature);
+    InputConfigurator.setBatchScan(CLASS, job, enableFeature);
   }
 
   /**
@@ -360,7 +357,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setBatchScan(JobConf, boolean)
    */
   public static boolean isBatchScan(JobConf job) {
-    return Configurator.isBatchScan(CLASS, job);
+    return InputConfigurator.isBatchScan(CLASS, job);
   }
 
   /**
@@ -379,7 +376,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
    */
   public static void setSamplerConfiguration(JobConf job, SamplerConfiguration samplerConfig) {
-    Configurator.setSamplerConfiguration(CLASS, job, samplerConfig);
+    InputConfigurator.setSamplerConfiguration(CLASS, job, samplerConfig);
   }
 
   protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> {
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 7f9aab6..e8e2f15 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -30,10 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchScanner;
@@ -54,13 +51,13 @@ import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.clientImpl.ClientInfo;
 import org.apache.accumulo.core.clientImpl.DelegationTokenImpl;
 import org.apache.accumulo.core.clientImpl.OfflineScanner;
 import org.apache.accumulo.core.clientImpl.ScannerImpl;
 import org.apache.accumulo.core.clientImpl.Table;
 import org.apache.accumulo.core.clientImpl.Tables;
 import org.apache.accumulo.core.clientImpl.TabletLocator;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -90,10 +87,6 @@ import org.apache.log4j.Logger;
 @Deprecated
 public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
 
-  // static wrapper class to make references to deprecated configurator easier
-  private static class Configurator
-      extends org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator {}
-
   protected static final Class<?> CLASS = AccumuloInputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
 
@@ -107,7 +100,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * @since 1.8.0
    */
   public static void setClassLoaderContext(Job job, String context) {
-    Configurator.setClassLoaderContext(CLASS, job.getConfiguration(), context);
+    InputConfigurator.setClassLoaderContext(CLASS, job.getConfiguration(), context);
   }
 
   /**
@@ -119,7 +112,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * @since 1.8.0
    */
   public static String getClassLoaderContext(JobContext job) {
-    return Configurator.getClassLoaderContext(CLASS, job.getConfiguration());
+    return InputConfigurator.getClassLoaderContext(CLASS, job.getConfiguration());
   }
 
   /**
@@ -146,9 +139,8 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
       throws AccumuloSecurityException {
     if (token instanceof KerberosToken) {
       log.info("Received KerberosToken, attempting to fetch DelegationToken");
-      try (AccumuloClient client = Accumulo.newClient()
-          .from(Configurator.getClientProperties(CLASS, job.getConfiguration()))
-          .as(principal, token).build()) {
+      try {
+        ClientContext client = InputConfigurator.client(CLASS, job.getConfiguration());
         token = client.securityOperations().getDelegationToken(new DelegationTokenConfig());
       } catch (Exception e) {
         log.warn("Failed to automatically obtain DelegationToken, "
@@ -169,7 +161,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
       job.getCredentials().addToken(hadoopToken.getService(), hadoopToken);
     }
 
-    Configurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
+    InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
   }
 
   /**
@@ -189,33 +181,33 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    */
   public static void setConnectorInfo(Job job, String principal, String tokenFile)
       throws AccumuloSecurityException {
-    Configurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
+    InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
   }
 
   /**
    * Determines if the connector has been configured.
    *
-   * @param context
+   * @param job
    *          the Hadoop context for the configured job
    * @return true if the connector has been configured, false otherwise
    * @since 1.5.0
    * @see #setConnectorInfo(Job, String, AuthenticationToken)
    */
-  protected static Boolean isConnectorInfoSet(JobContext context) {
-    return Configurator.isConnectorInfoSet(CLASS, context.getConfiguration());
+  protected static Boolean isConnectorInfoSet(JobContext job) {
+    return InputConfigurator.isConnectorInfoSet(CLASS, job.getConfiguration());
   }
 
   /**
    * Gets the user name from the configuration.
    *
-   * @param context
+   * @param job
    *          the Hadoop context for the configured job
    * @return the user name
    * @since 1.5.0
    * @see #setConnectorInfo(Job, String, AuthenticationToken)
    */
-  protected static String getPrincipal(JobContext context) {
-    return Configurator.getPrincipal(CLASS, context.getConfiguration());
+  protected static String getPrincipal(JobContext job) {
+    return InputConfigurator.getPrincipal(CLASS, job.getConfiguration());
   }
 
   /**
@@ -225,8 +217,8 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead.
    */
   @Deprecated
-  protected static String getTokenClass(JobContext context) {
-    return getAuthenticationToken(context).getClass().getName();
+  protected static String getTokenClass(JobContext job) {
+    return getAuthenticationToken(job).getClass().getName();
   }
 
   /**
@@ -236,26 +228,25 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead.
    */
   @Deprecated
-  protected static byte[] getToken(JobContext context) {
-    return AuthenticationToken.AuthenticationTokenSerializer
-        .serialize(getAuthenticationToken(context));
+  protected static byte[] getToken(JobContext job) {
+    return AuthenticationToken.AuthenticationTokenSerializer.serialize(getAuthenticationToken(job));
   }
 
   /**
    * Gets the authenticated token from either the specified token file or directly from the
    * configuration, whichever was used when the job was configured.
    *
-   * @param context
+   * @param job
    *          the Hadoop context for the configured job
    * @return the principal's authentication token
    * @since 1.6.0
    * @see #setConnectorInfo(Job, String, AuthenticationToken)
    * @see #setConnectorInfo(Job, String, String)
    */
-  protected static AuthenticationToken getAuthenticationToken(JobContext context) {
-    AuthenticationToken token = Configurator.getAuthenticationToken(CLASS,
-        context.getConfiguration());
-    return Configurator.unwrapAuthenticationToken(context, token);
+  protected static AuthenticationToken getAuthenticationToken(JobContext job) {
+    AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS,
+        job.getConfiguration());
+    return InputConfigurator.unwrapAuthenticationToken(job, token);
   }
 
   /**
@@ -288,20 +279,20 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    */
   public static void setZooKeeperInstance(Job job,
       org.apache.accumulo.core.client.ClientConfiguration clientConfig) {
-    Configurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig);
+    InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig);
   }
 
   /**
    * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the
    * configuration.
    *
-   * @param context
+   * @param job
    *          the Hadoop context for the configured job
    * @return an Accumulo instance
    * @since 1.5.0
    */
-  protected static org.apache.accumulo.core.client.Instance getInstance(JobContext context) {
-    return Configurator.getInstance(CLASS, context.getConfiguration());
+  protected static org.apache.accumulo.core.client.Instance getInstance(JobContext job) {
+    return InputConfigurator.getInstance(CLASS, job.getConfiguration());
   }
 
   /**
@@ -314,20 +305,20 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setLogLevel(Job job, Level level) {
-    Configurator.setLogLevel(CLASS, job.getConfiguration(), level);
+    InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
 
   /**
    * Gets the log level from this configuration.
    *
-   * @param context
+   * @param job
    *          the Hadoop context for the configured job
    * @return the log level
    * @since 1.5.0
    * @see #setLogLevel(Job, Level)
    */
-  protected static Level getLogLevel(JobContext context) {
-    return Configurator.getLogLevel(CLASS, context.getConfiguration());
+  protected static Level getLogLevel(JobContext job) {
+    return InputConfigurator.getLogLevel(CLASS, job.getConfiguration());
   }
 
   /**
@@ -340,32 +331,32 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    *          the user's authorizations
    */
   public static void setScanAuthorizations(Job job, Authorizations auths) {
-    Configurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
+    InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
   }
 
   /**
    * Gets the authorizations to set for the scans from the configuration.
    *
-   * @param context
+   * @param job
    *          the Hadoop context for the configured job
    * @return the Accumulo scan authorizations
    * @since 1.5.0
    * @see #setScanAuthorizations(Job, Authorizations)
    */
-  protected static Authorizations getScanAuthorizations(JobContext context) {
-    return Configurator.getScanAuthorizations(CLASS, context.getConfiguration());
+  protected static Authorizations getScanAuthorizations(JobContext job) {
+    return InputConfigurator.getScanAuthorizations(CLASS, job.getConfiguration());
   }
 
   /**
    * Fetches all {@link InputTableConfig}s that have been set on the given job.
    *
-   * @param context
+   * @param job
    *          the Hadoop job instance to be configured
    * @return the {@link InputTableConfig} objects for the job
    * @since 1.6.0
    */
-  protected static Map<String,InputTableConfig> getInputTableConfigs(JobContext context) {
-    return Configurator.getInputTableConfigs(CLASS, context.getConfiguration());
+  protected static Map<String,InputTableConfig> getInputTableConfigs(JobContext job) {
+    return InputConfigurator.getInputTableConfigs(CLASS, job.getConfiguration());
   }
 
   /**
@@ -374,15 +365,15 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * <p>
    * null is returned in the event that the table doesn't exist.
    *
-   * @param context
+   * @param job
    *          the Hadoop job instance to be configured
    * @param tableName
    *          the table name for which to grab the config object
    * @return the {@link InputTableConfig} for the given table
    * @since 1.6.0
    */
-  protected static InputTableConfig getInputTableConfig(JobContext context, String tableName) {
-    return Configurator.getInputTableConfig(CLASS, context.getConfiguration(), tableName);
+  protected static InputTableConfig getInputTableConfig(JobContext job, String tableName) {
+    return InputConfigurator.getInputTableConfig(CLASS, job.getConfiguration(), tableName);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -390,28 +381,28 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * Check whether a configuration is fully configured to be used with an Accumulo
    * {@link org.apache.hadoop.mapreduce.InputFormat}.
    *
-   * @param context
+   * @param job
    *          the Hadoop context for the configured job
    * @throws java.io.IOException
    *           if the context is improperly configured
    * @since 1.5.0
    */
-  protected static void validateOptions(JobContext context) throws IOException {
-    Configurator.validatePermissions(CLASS, context.getConfiguration());
+  protected static void validateOptions(JobContext job) throws IOException {
+    InputConfigurator.validatePermissions(CLASS, job.getConfiguration());
   }
 
   /**
    * Construct the {@link org.apache.accumulo.core.client.ClientConfiguration} given the provided
    * context.
    *
-   * @param context
+   * @param job
    *          The Job
    * @return The ClientConfiguration
    * @since 1.7.0
    */
   protected static org.apache.accumulo.core.client.ClientConfiguration getClientConfiguration(
-      JobContext context) {
-    return Configurator.getClientConfiguration(CLASS, context.getConfiguration());
+      JobContext job) {
+    return InputConfigurator.getClientConfiguration(CLASS, job.getConfiguration());
   }
 
   /**
@@ -427,7 +418,6 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * </ul>
    */
   protected abstract static class AbstractRecordReader<K,V> extends RecordReader<K,V> {
-    private ClientContext context = null;
     protected long numKeysRead;
     protected Iterator<Map.Entry<Key,Value>> scannerIterator;
     protected ScannerBase scannerBase;
@@ -502,10 +492,6 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
       split = (RangeInputSplit) inSplit;
       log.debug("Initializing input split: " + split);
 
-      if (context == null) {
-        context = new ClientContext(
-            Configurator.getClientProperties(CLASS, attempt.getConfiguration()));
-      }
       Authorizations authorizations = getScanAuthorizations(attempt);
       String classLoaderContext = getClassLoaderContext(attempt);
       String table = split.getTableName();
@@ -515,7 +501,9 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
       // but the scanner will use the table id resolved at job setup time
       InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName());
 
-      log.debug("Creating client with user: " + context.whoami());
+      ClientContext client = InputConfigurator.client(CLASS, split, attempt.getConfiguration());
+
+      log.debug("Created client with user: " + client.whoami());
       log.debug("Creating scanner for table: " + table);
       log.debug("Authorizations are: " + authorizations);
 
@@ -527,7 +515,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
           // Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit
           // will not span tablets
           int scanThreads = 1;
-          scanner = context.createBatchScanner(split.getTableName(), authorizations, scanThreads);
+          scanner = client.createBatchScanner(split.getTableName(), authorizations, scanThreads);
           setupIterators(attempt, scanner, split.getTableName(), split);
           if (classLoaderContext != null) {
             scanner.setClassLoaderContext(classLoaderContext);
@@ -559,11 +547,11 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
 
         try {
           if (isOffline) {
-            scanner = new OfflineScanner(context, Table.ID.of(split.getTableId()), authorizations);
+            scanner = new OfflineScanner(client, Table.ID.of(split.getTableId()), authorizations);
           } else {
             // Not using public API to create scanner so that we can use table ID
             // Table ID is used in case of renames during M/R job
-            scanner = new ScannerImpl(context, Table.ID.of(split.getTableId()), authorizations);
+            scanner = new ScannerImpl(client, Table.ID.of(split.getTableId()), authorizations);
           }
           if (isIsolated) {
             log.info("Creating isolated scanner");
@@ -615,26 +603,9 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
 
     @Override
     public void close() {
-      // close several objects, aggregating any exceptions thrown
-      Stream.of(scannerBase, context).flatMap(o -> {
-        try {
-          if (o != null) {
-            o.close();
-          }
-          return null;
-        } catch (Exception e) {
-          return Stream.of(e);
-        }
-      }).reduce((e1, e2) -> {
-        e1.addSuppressed(e2);
-        return e1;
-      }).ifPresent(e -> {
-        if (e instanceof RuntimeException) {
-          throw (RuntimeException) e;
-        } else {
-          throw new RuntimeException(e);
-        }
-      });
+      if (scannerBase != null) {
+        scannerBase.close();
+      }
     }
 
     @Override
@@ -671,11 +642,11 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
     }
   }
 
-  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, Table.ID tableId,
+  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job, Table.ID tableId,
       List<Range> ranges)
       throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-    return Configurator.binOffline(tableId, ranges,
-        ClientInfo.from(Configurator.getClientProperties(CLASS, context.getConfiguration())));
+    return InputConfigurator.binOffline(tableId, ranges,
+        InputConfigurator.client(CLASS, job.getConfiguration()));
   }
 
   /**
@@ -688,154 +659,158 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    *           locator
    */
   @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    Level logLevel = getLogLevel(context);
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    Level logLevel = getLogLevel(job);
     log.setLevel(logLevel);
-    validateOptions(context);
+    validateOptions(job);
+
     Random random = new SecureRandom();
     LinkedList<InputSplit> splits = new LinkedList<>();
-    Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context);
-    try (AccumuloClient client = Accumulo.newClient()
-        .from(Configurator.getClientProperties(CLASS, context.getConfiguration())).build()) {
-      for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
+    Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job);
 
-        String tableName = tableConfigEntry.getKey();
-        InputTableConfig tableConfig = tableConfigEntry.getValue();
+    for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
 
-        Table.ID tableId;
-        // resolve table name to id once, and use id from this point forward
-        try {
-          tableId = Tables.getTableId((ClientContext) client, tableName);
-        } catch (TableNotFoundException e) {
-          throw new IOException(e);
-        }
-
-        boolean batchScan = Configurator.isBatchScan(CLASS, context.getConfiguration());
-        boolean supportBatchScan = !(tableConfig.isOfflineScan()
-            || tableConfig.shouldUseIsolatedScanners() || tableConfig.shouldUseLocalIterators());
-        if (batchScan && !supportBatchScan)
-          throw new IllegalArgumentException("BatchScanner optimization not available for offline"
-              + " scan, isolated, or local iterators");
-
-        boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
-        if (batchScan && !autoAdjust)
-          throw new IllegalArgumentException(
-              "AutoAdjustRanges must be enabled when using BatchScanner optimization");
-
-        List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges())
-            : tableConfig.getRanges();
-        if (ranges.isEmpty()) {
-          ranges = new ArrayList<>(1);
-          ranges.add(new Range());
-        }
+      String tableName = tableConfigEntry.getKey();
+      InputTableConfig tableConfig = tableConfigEntry.getValue();
 
-        // get the metadata information for these ranges
-        Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
-        TabletLocator tl;
-        try {
-          if (tableConfig.isOfflineScan()) {
-            binnedRanges = binOfflineTable(context, tableId, ranges);
-            while (binnedRanges == null) {
-              // Some tablets were still online, try again
-              // sleep randomly between 100 and 200 ms
-              sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
-              binnedRanges = binOfflineTable(context, tableId, ranges);
+      ClientContext client;
+      try {
+        client = InputConfigurator.client(CLASS, job.getConfiguration());
+      } catch (AccumuloException | AccumuloSecurityException e) {
+        throw new IOException(e);
+      }
 
-            }
-          } else {
-            tl = TabletLocator.getLocator((ClientContext) client, tableId);
-            // its possible that the cache could contain complete, but old information about a
-            // tables tablets... so clear it
-            tl.invalidateCache();
+      Table.ID tableId;
+      // resolve table name to id once, and use id from this point forward
+      try {
+        tableId = Tables.getTableId(client, tableName);
+      } catch (TableNotFoundException e) {
+        throw new IOException(e);
+      }
 
-            while (!tl.binRanges((ClientContext) client, ranges, binnedRanges).isEmpty()) {
-              String tableIdStr = tableId.canonicalID();
-              if (!Tables.exists((ClientContext) client, tableId))
-                throw new TableDeletedException(tableIdStr);
-              if (Tables.getTableState((ClientContext) client, tableId) == TableState.OFFLINE)
-                throw new TableOfflineException(
-                    Tables.getTableOfflineMsg((ClientContext) client, tableId));
-              binnedRanges.clear();
-              log.warn("Unable to locate bins for specified ranges. Retrying.");
-              // sleep randomly between 100 and 200 ms
-              sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
-              tl.invalidateCache();
-            }
-          }
-        } catch (Exception e) {
-          throw new IOException(e);
-        }
+      boolean batchScan = InputConfigurator.isBatchScan(CLASS, job.getConfiguration());
+      boolean supportBatchScan = !(tableConfig.isOfflineScan()
+          || tableConfig.shouldUseIsolatedScanners() || tableConfig.shouldUseLocalIterators());
+      if (batchScan && !supportBatchScan)
+        throw new IllegalArgumentException("BatchScanner optimization not available for offline"
+            + " scan, isolated, or local iterators");
+
+      boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+      if (batchScan && !autoAdjust)
+        throw new IllegalArgumentException(
+            "AutoAdjustRanges must be enabled when using BatchScanner optimization");
+
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges())
+          : tableConfig.getRanges();
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<>(1);
+        ranges.add(new Range());
+      }
 
-        // all of this code will add either range per each locations or split ranges and add
-        // range-location split
-        // Map from Range to Array of Locations, we only use this if we're don't split
-        HashMap<Range,ArrayList<String>> splitsToAdd = null;
-
-        if (!autoAdjust)
-          splitsToAdd = new HashMap<>();
-
-        HashMap<String,String> hostNameCache = new HashMap<>();
-        for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-          String ip = tserverBin.getKey().split(":", 2)[0];
-          String location = hostNameCache.get(ip);
-          if (location == null) {
-            InetAddress inetAddress = InetAddress.getByName(ip);
-            location = inetAddress.getCanonicalHostName();
-            hostNameCache.put(ip, location);
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
+      TabletLocator tl;
+      try {
+        if (tableConfig.isOfflineScan()) {
+          binnedRanges = binOfflineTable(job, tableId, ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            // sleep randomly between 100 and 200 ms
+            sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
+            binnedRanges = binOfflineTable(job, tableId, ranges);
           }
-          for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-            Range ke = extentRanges.getKey().toDataRange();
-            if (batchScan) {
-              // group ranges by tablet to be read by a BatchScanner
-              ArrayList<Range> clippedRanges = new ArrayList<>();
-              for (Range r : extentRanges.getValue())
-                clippedRanges.add(ke.clip(r));
-              org.apache.accumulo.core.clientImpl.mapreduce.BatchInputSplit split = new org.apache.accumulo.core.clientImpl.mapreduce.BatchInputSplit(
-                  tableName, tableId, clippedRanges, new String[] {location});
-              org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split,
-                  tableConfig, logLevel);
-
-              splits.add(split);
-            } else {
-              // not grouping by tablet
-              for (Range r : extentRanges.getValue()) {
-                if (autoAdjust) {
-                  // divide ranges into smaller ranges, based on the tablets
-                  RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(),
-                      ke.clip(r), new String[] {location});
-                  org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split,
-                      tableConfig, logLevel);
-                  split.setOffline(tableConfig.isOfflineScan());
-                  split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
-                  split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
-                  splits.add(split);
-                } else {
-                  // don't divide ranges
-                  ArrayList<String> locations = splitsToAdd.get(r);
-                  if (locations == null)
-                    locations = new ArrayList<>(1);
-                  locations.add(location);
-                  splitsToAdd.put(r, locations);
-                }
-              }
-            }
+        } else {
+          tl = TabletLocator.getLocator(client, tableId);
+          // its possible that the cache could contain complete, but old information about a
+          // tables tablets... so clear it
+          tl.invalidateCache();
+
+          while (!tl.binRanges(client, ranges, binnedRanges).isEmpty()) {
+            String tableIdStr = tableId.canonicalID();
+            if (!Tables.exists(client, tableId))
+              throw new TableDeletedException(tableIdStr);
+            if (Tables.getTableState(client, tableId) == TableState.OFFLINE)
+              throw new TableOfflineException(Tables.getTableOfflineMsg(client, tableId));
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            // sleep randomly between 100 and 200 ms
+            sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
+            tl.invalidateCache();
           }
         }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
 
-        if (!autoAdjust)
-          for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
-            RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(),
-                entry.getKey(), entry.getValue().toArray(new String[0]));
+      // all of this code will add either range per each locations or split ranges and add
+      // range-location split
+      // Map from Range to Array of Locations, we only use this if we're don't split
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<>();
+
+      HashMap<String,String> hostNameCache = new HashMap<>();
+      for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getCanonicalHostName();
+          hostNameCache.put(ip, location);
+        }
+        for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          if (batchScan) {
+            // group ranges by tablet to be read by a BatchScanner
+            ArrayList<Range> clippedRanges = new ArrayList<>();
+            for (Range r : extentRanges.getValue())
+              clippedRanges.add(ke.clip(r));
+            org.apache.accumulo.core.clientImpl.mapreduce.BatchInputSplit split = new org.apache.accumulo.core.clientImpl.mapreduce.BatchInputSplit(
+                tableName, tableId, clippedRanges, new String[] {location});
             org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split, tableConfig,
                 logLevel);
-            split.setOffline(tableConfig.isOfflineScan());
-            split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
-            split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
 
             splits.add(split);
+          } else {
+            // not grouping by tablet
+            for (Range r : extentRanges.getValue()) {
+              if (autoAdjust) {
+                // divide ranges into smaller ranges, based on the tablets
+                RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(),
+                    ke.clip(r), new String[] {location});
+                org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split,
+                    tableConfig, logLevel);
+                split.setOffline(tableConfig.isOfflineScan());
+                split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+                split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+                splits.add(split);
+              } else {
+                // don't divide ranges
+                ArrayList<String> locations = splitsToAdd.get(r);
+                if (locations == null)
+                  locations = new ArrayList<>(1);
+                locations.add(location);
+                splitsToAdd.put(r, locations);
+              }
+            }
           }
+        }
       }
-      return splits;
+
+      if (!autoAdjust)
+        for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
+          RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(),
+              entry.getKey(), entry.getValue().toArray(new String[0]));
+          org.apache.accumulo.core.clientImpl.mapreduce.SplitUtils.updateSplit(split, tableConfig,
+              logLevel);
+          split.setOffline(tableConfig.isOfflineScan());
+          split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+          split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+
+          splits.add(split);
+        }
     }
+    return splits;
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index 2ae54a0..ece9391 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.FileOutputConfigurator;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -52,10 +53,6 @@ import org.apache.log4j.Logger;
 @Deprecated
 public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
 
-  // static wrapper class to make references to deprecated configurator easier
-  private static class Configurator
-      extends org.apache.accumulo.core.clientImpl.mapreduce.lib.FileOutputConfigurator {}
-
   private static final Class<?> CLASS = AccumuloFileOutputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
 
@@ -70,7 +67,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setCompressionType(Job job, String compressionType) {
-    Configurator.setCompressionType(CLASS, job.getConfiguration(), compressionType);
+    FileOutputConfigurator.setCompressionType(CLASS, job.getConfiguration(), compressionType);
   }
 
   /**
@@ -89,7 +86,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setDataBlockSize(Job job, long dataBlockSize) {
-    Configurator.setDataBlockSize(CLASS, job.getConfiguration(), dataBlockSize);
+    FileOutputConfigurator.setDataBlockSize(CLASS, job.getConfiguration(), dataBlockSize);
   }
 
   /**
@@ -103,7 +100,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setFileBlockSize(Job job, long fileBlockSize) {
-    Configurator.setFileBlockSize(CLASS, job.getConfiguration(), fileBlockSize);
+    FileOutputConfigurator.setFileBlockSize(CLASS, job.getConfiguration(), fileBlockSize);
   }
 
   /**
@@ -118,7 +115,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setIndexBlockSize(Job job, long indexBlockSize) {
-    Configurator.setIndexBlockSize(CLASS, job.getConfiguration(), indexBlockSize);
+    FileOutputConfigurator.setIndexBlockSize(CLASS, job.getConfiguration(), indexBlockSize);
   }
 
   /**
@@ -132,7 +129,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    * @since 1.5.0
    */
   public static void setReplication(Job job, int replication) {
-    Configurator.setReplication(CLASS, job.getConfiguration(), replication);
+    FileOutputConfigurator.setReplication(CLASS, job.getConfiguration(), replication);
   }
 
   /**
@@ -147,19 +144,19 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
    */
 
   public static void setSampler(Job job, SamplerConfiguration samplerConfig) {
-    Configurator.setSampler(CLASS, job.getConfiguration(), samplerConfig);
+    FileOutputConfigurator.setSampler(CLASS, job.getConfiguration(), samplerConfig);
   }
 
   @Override
   public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException {
     // get the path of the temporary output file
     final Configuration conf = context.getConfiguration();
-    final AccumuloConfiguration acuConf = Configurator.getAccumuloConfiguration(CLASS,
+    final AccumuloConfiguration acuConf = FileOutputConfigurator.getAccumuloConfiguration(CLASS,
         context.getConfiguration());
 
     final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
     final Path file = this.getDefaultWorkFile(context, "." + extension);
-    final int visCacheSize = Configurator.getVisibilityCacheSize(conf);
+    final int visCacheSize = FileOutputConfigurator.getVisibilityCacheSize(conf);
 
     return new RecordWriter<Key,Value>() {
       RFileWriter out = null;
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 82214b3..99b414f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -41,7 +40,9 @@ import org.apache.accumulo.core.client.security.tokens.DelegationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.DelegationTokenImpl;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.OutputConfigurator;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TabletId;
@@ -79,10 +80,6 @@ import org.apache.log4j.Logger;
 @Deprecated
 public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
 
-  // static wrapper class to make references to deprecated configurator easier
-  private static class Configurator
-      extends org.apache.accumulo.core.clientImpl.mapreduce.lib.OutputConfigurator {}
-
   private static final Class<?> CLASS = AccumuloOutputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
 
@@ -112,9 +109,8 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
     if (token instanceof KerberosToken) {
       log.info("Received KerberosToken, attempting to fetch DelegationToken");
       try {
-        org.apache.accumulo.core.client.Instance instance = getInstance(job);
-        org.apache.accumulo.core.client.Connector conn = instance.getConnector(principal, token);
-        token = conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+        ClientContext client = OutputConfigurator.client(CLASS, job.getConfiguration());
+        token = client.securityOperations().getDelegationToken(new DelegationTokenConfig());
       } catch (Exception e) {
         log.warn("Failed to automatically obtain DelegationToken, "
             + "Mappers/Reducers will likely fail to communicate with Accumulo", e);
@@ -134,7 +130,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
       job.getCredentials().addToken(hadoopToken.getService(), hadoopToken);
     }
 
-    Configurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
+    OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
   }
 
   /**
@@ -155,7 +151,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    */
   public static void setConnectorInfo(Job job, String principal, String tokenFile)
       throws AccumuloSecurityException {
-    Configurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
+    OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
   }
 
   /**
@@ -168,7 +164,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setConnectorInfo(Job, String, AuthenticationToken)
    */
   protected static Boolean isConnectorInfoSet(JobContext context) {
-    return Configurator.isConnectorInfoSet(CLASS, context.getConfiguration());
+    return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration());
   }
 
   /**
@@ -181,7 +177,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setConnectorInfo(Job, String, AuthenticationToken)
    */
   protected static String getPrincipal(JobContext context) {
-    return Configurator.getPrincipal(CLASS, context.getConfiguration());
+    return OutputConfigurator.getPrincipal(CLASS, context.getConfiguration());
   }
 
   /**
@@ -218,9 +214,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setConnectorInfo(Job, String, String)
    */
   protected static AuthenticationToken getAuthenticationToken(JobContext context) {
-    AuthenticationToken token = Configurator.getAuthenticationToken(CLASS,
+    AuthenticationToken token = OutputConfigurator.getAuthenticationToken(CLASS,
         context.getConfiguration());
-    return Configurator.unwrapAuthenticationToken(context, token);
+    return OutputConfigurator.unwrapAuthenticationToken(context, token);
   }
 
   /**
@@ -255,7 +251,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    */
   public static void setZooKeeperInstance(Job job,
       org.apache.accumulo.core.client.ClientConfiguration clientConfig) {
-    Configurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig);
+    OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig);
   }
 
   /**
@@ -268,7 +264,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   protected static org.apache.accumulo.core.client.Instance getInstance(JobContext context) {
-    return Configurator.getInstance(CLASS, context.getConfiguration());
+    return OutputConfigurator.getInstance(CLASS, context.getConfiguration());
   }
 
   /**
@@ -281,7 +277,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setLogLevel(Job job, Level level) {
-    Configurator.setLogLevel(CLASS, job.getConfiguration(), level);
+    OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
 
   /**
@@ -294,7 +290,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setLogLevel(Job, Level)
    */
   protected static Level getLogLevel(JobContext context) {
-    return Configurator.getLogLevel(CLASS, context.getConfiguration());
+    return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration());
   }
 
   /**
@@ -308,7 +304,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setDefaultTableName(Job job, String tableName) {
-    Configurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName);
+    OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName);
   }
 
   /**
@@ -321,7 +317,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setDefaultTableName(Job, String)
    */
   protected static String getDefaultTableName(JobContext context) {
-    return Configurator.getDefaultTableName(CLASS, context.getConfiguration());
+    return OutputConfigurator.getDefaultTableName(CLASS, context.getConfiguration());
   }
 
   /**
@@ -336,7 +332,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) {
-    Configurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig);
+    OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig);
   }
 
   /**
@@ -349,7 +345,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setBatchWriterOptions(Job, BatchWriterConfig)
    */
   protected static BatchWriterConfig getBatchWriterOptions(JobContext context) {
-    return Configurator.getBatchWriterOptions(CLASS, context.getConfiguration());
+    return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration());
   }
 
   /**
@@ -366,7 +362,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setCreateTables(Job job, boolean enableFeature) {
-    Configurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature);
+    OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature);
   }
 
   /**
@@ -379,7 +375,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setCreateTables(Job, boolean)
    */
   protected static Boolean canCreateTables(JobContext context) {
-    return Configurator.canCreateTables(CLASS, context.getConfiguration());
+    return OutputConfigurator.canCreateTables(CLASS, context.getConfiguration());
   }
 
   /**
@@ -396,7 +392,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @since 1.5.0
    */
   public static void setSimulationMode(Job job, boolean enableFeature) {
-    Configurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature);
+    OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature);
   }
 
   /**
@@ -409,7 +405,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setSimulationMode(Job, boolean)
    */
   protected static Boolean getSimulationMode(JobContext context) {
-    return Configurator.getSimulationMode(CLASS, context.getConfiguration());
+    return OutputConfigurator.getSimulationMode(CLASS, context.getConfiguration());
   }
 
   /**
@@ -445,8 +441,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
       this.defaultTableName = (tname == null) ? null : new Text(tname);
 
       if (!simulate) {
-        this.client = Accumulo.newClient()
-            .from(Configurator.getClientProperties(CLASS, context.getConfiguration())).build();
+        this.client = OutputConfigurator.client(CLASS, context.getConfiguration());
         mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(context));
       }
     }
@@ -585,8 +580,8 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
       throw new IOException("Connector info has not been set.");
     String principal = getPrincipal(job);
     AuthenticationToken token = getAuthenticationToken(job);
-    try (AccumuloClient c = Accumulo.newClient()
-        .from(Configurator.getClientProperties(CLASS, job.getConfiguration())).build()) {
+    try {
+      AccumuloClient c = OutputConfigurator.client(CLASS, job.getConfiguration());
       if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
     } catch (AccumuloException | AccumuloSecurityException e) {
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 76f0400..983d4a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -60,10 +61,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 @Deprecated
 public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
 
-  // static wrapper class to make references to deprecated configurator easier
-  private static class Configurator
-      extends org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator {}
-
   /**
    * Gets the table name from the configuration.
    *
@@ -74,7 +71,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setInputTableName(Job, String)
    */
   protected static String getInputTableName(JobContext context) {
-    return Configurator.getInputTableName(CLASS, context.getConfiguration());
+    return InputConfigurator.getInputTableName(CLASS, context.getConfiguration());
   }
 
   /**
@@ -87,7 +84,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setInputTableName(Job job, String tableName) {
-    Configurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
+    InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
   }
 
   /**
@@ -101,7 +98,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setRanges(Job job, Collection<Range> ranges) {
-    Configurator.setRanges(CLASS, job.getConfiguration(), ranges);
+    InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
   }
 
   /**
@@ -114,7 +111,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setRanges(Job, Collection)
    */
   protected static List<Range> getRanges(JobContext context) throws IOException {
-    return Configurator.getRanges(CLASS, context.getConfiguration());
+    return InputConfigurator.getRanges(CLASS, context.getConfiguration());
   }
 
   /**
@@ -130,7 +127,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    */
   public static void fetchColumns(Job job,
       Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    Configurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
+    InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
   }
 
   /**
@@ -143,7 +140,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #fetchColumns(Job, Collection)
    */
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
-    return Configurator.getFetchedColumns(CLASS, context.getConfiguration());
+    return InputConfigurator.getFetchedColumns(CLASS, context.getConfiguration());
   }
 
   /**
@@ -156,7 +153,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void addIterator(Job job, IteratorSetting cfg) {
-    Configurator.addIterator(CLASS, job.getConfiguration(), cfg);
+    InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
   }
 
   /**
@@ -170,7 +167,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #addIterator(Job, IteratorSetting)
    */
   protected static List<IteratorSetting> getIterators(JobContext context) {
-    return Configurator.getIterators(CLASS, context.getConfiguration());
+    return InputConfigurator.getIterators(CLASS, context.getConfiguration());
   }
 
   /**
@@ -189,7 +186,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
-    Configurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
+    InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
   }
 
   /**
@@ -203,7 +200,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setAutoAdjustRanges(Job, boolean)
    */
   protected static boolean getAutoAdjustRanges(JobContext context) {
-    return Configurator.getAutoAdjustRanges(CLASS, context.getConfiguration());
+    return InputConfigurator.getAutoAdjustRanges(CLASS, context.getConfiguration());
   }
 
   /**
@@ -219,7 +216,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setScanIsolation(Job job, boolean enableFeature) {
-    Configurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
+    InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
   }
 
   /**
@@ -232,7 +229,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setScanIsolation(Job, boolean)
    */
   protected static boolean isIsolated(JobContext context) {
-    return Configurator.isIsolated(CLASS, context.getConfiguration());
+    return InputConfigurator.isIsolated(CLASS, context.getConfiguration());
   }
 
   /**
@@ -251,7 +248,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setLocalIterators(Job job, boolean enableFeature) {
-    Configurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
+    InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
   }
 
   /**
@@ -264,7 +261,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setLocalIterators(Job, boolean)
    */
   protected static boolean usesLocalIterators(JobContext context) {
-    return Configurator.usesLocalIterators(CLASS, context.getConfiguration());
+    return InputConfigurator.usesLocalIterators(CLASS, context.getConfiguration());
   }
 
   /**
@@ -303,7 +300,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setOfflineTableScan(Job job, boolean enableFeature) {
-    Configurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
+    InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
   }
 
   /**
@@ -316,7 +313,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setOfflineTableScan(Job, boolean)
    */
   protected static boolean isOfflineScan(JobContext context) {
-    return Configurator.isOfflineScan(CLASS, context.getConfiguration());
+    return InputConfigurator.isOfflineScan(CLASS, context.getConfiguration());
   }
 
   /**
@@ -347,7 +344,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @since 1.7.0
    */
   public static void setBatchScan(Job job, boolean enableFeature) {
-    Configurator.setBatchScan(CLASS, job.getConfiguration(), enableFeature);
+    InputConfigurator.setBatchScan(CLASS, job.getConfiguration(), enableFeature);
   }
 
   /**
@@ -359,7 +356,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see #setBatchScan(Job, boolean)
    */
   public static boolean isBatchScan(JobContext context) {
-    return Configurator.isBatchScan(CLASS, context.getConfiguration());
+    return InputConfigurator.isBatchScan(CLASS, context.getConfiguration());
   }
 
   /**
@@ -378,7 +375,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
    * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
    */
   public static void setSamplerConfiguration(Job job, SamplerConfiguration samplerConfig) {
-    Configurator.setSamplerConfiguration(CLASS, job.getConfiguration(), samplerConfig);
+    InputConfigurator.setSamplerConfiguration(CLASS, job.getConfiguration(), samplerConfig);
   }
 
   protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> {
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java
index 30816df..597c8b2 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java
@@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchDeleter;
@@ -72,7 +71,7 @@ public class ConnectorImpl extends org.apache.accumulo.core.client.Connector {
     }
   }
 
-  public AccumuloClient getAccumuloClient() {
+  public ClientContext getAccumuloClient() {
     return context;
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
index cfccdb4..d5a282f 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
@@ -22,24 +22,19 @@ import static java.util.Objects.requireNonNull;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Properties;
-import java.util.Scanner;
+import java.util.Base64;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
 import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier;
-import org.apache.accumulo.core.clientImpl.ClientConfConverter;
-import org.apache.accumulo.core.clientImpl.ClientInfo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.Credentials;
 import org.apache.accumulo.core.clientImpl.DelegationTokenImpl;
-import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -53,11 +48,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-/**
- * @since 1.6.0
- * @deprecated since 2.0.0
- */
-@Deprecated
+@SuppressWarnings("deprecation")
 public class ConfiguratorBase {
 
   protected static final Logger log = Logger.getLogger(ConfiguratorBase.class);
@@ -71,7 +62,7 @@ public class ConfiguratorBase {
     IS_CONFIGURED, PRINCIPAL, TOKEN
   }
 
-  public static enum TokenSource {
+  public enum TokenSource {
     FILE, INLINE, JOB;
 
     private String prefix;
@@ -85,8 +76,13 @@ public class ConfiguratorBase {
     }
   }
 
-  public enum ClientOpts {
-    CLIENT_PROPS, CLIENT_PROPS_FILE
+  /**
+   * Configuration keys for available Instance types.
+   *
+   * @since 1.6.0
+   */
+  public enum InstanceOpts {
+    TYPE, NAME, ZOO_KEEPERS, CLIENT_CONFIG;
   }
 
   /**
@@ -125,91 +121,6 @@ public class ConfiguratorBase {
         + StringUtils.camelize(e.name().toLowerCase());
   }
 
-  public static Properties updateToken(org.apache.hadoop.security.Credentials credentials,
-      Properties props) {
-    Properties result = new Properties();
-    props.forEach((key, value) -> result.setProperty((String) key, (String) value));
-
-    AuthenticationToken token = ClientProperty.getAuthenticationToken(result);
-    if (token instanceof KerberosToken) {
-      log.info("Received KerberosToken, attempting to fetch DelegationToken");
-      try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
-        AuthenticationToken delegationToken = client.securityOperations()
-            .getDelegationToken(new DelegationTokenConfig());
-        ClientProperty.setAuthenticationToken(result, delegationToken);
-      } catch (Exception e) {
-        log.warn("Failed to automatically obtain DelegationToken, "
-            + "Mappers/Reducers will likely fail to communicate with Accumulo", e);
-      }
-    }
-    // DelegationTokens can be passed securely from user to task without serializing insecurely in
-    // the configuration
-    if (token instanceof DelegationTokenImpl) {
-      DelegationTokenImpl delegationToken = (DelegationTokenImpl) token;
-
-      // Convert it into a Hadoop Token
-      AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier();
-      Token<AuthenticationTokenIdentifier> hadoopToken = new Token<>(identifier.getBytes(),
-          delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName());
-
-      // Add the Hadoop Token to the Job so it gets serialized and passed along.
-      credentials.addToken(hadoopToken.getService(), hadoopToken);
-    }
-    return result;
-  }
-
-  public static void setClientProperties(Class<?> implementingClass, Configuration conf,
-      Properties props) {
-    StringWriter writer = new StringWriter();
-    try {
-      props.store(writer, "client properties");
-    } catch (IOException e) {
-      throw new IllegalStateException(e);
-    }
-    conf.set(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS), writer.toString());
-    conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
-  }
-
-  public static Properties getClientProperties(Class<?> implementingClass, Configuration conf) {
-    String propString;
-    String clientPropsFile = conf
-        .get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS_FILE), "");
-    if (!clientPropsFile.isEmpty()) {
-      try {
-        URI[] uris = DistributedCacheHelper.getCacheFiles(conf);
-        Path path = null;
-        for (URI u : uris) {
-          if (u.toString().equals(clientPropsFile)) {
-            path = new Path(u);
-          }
-        }
-        FileSystem fs = FileSystem.get(conf);
-        FSDataInputStream inputStream = fs.open(path);
-        StringBuilder sb = new StringBuilder();
-        try (Scanner scanner = new Scanner(inputStream)) {
-          while (scanner.hasNextLine()) {
-            sb.append(scanner.nextLine() + "\n");
-          }
-        }
-        propString = sb.toString();
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "Failed to read client properties from distributed cache: " + clientPropsFile);
-      }
-    } else {
-      propString = conf.get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS), "");
-    }
-    Properties props = new Properties();
-    if (!propString.isEmpty()) {
-      try {
-        props.load(new StringReader(propString));
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-    return props;
-  }
-
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    *
@@ -230,13 +141,24 @@ public class ConfiguratorBase {
    */
   public static void setConnectorInfo(Class<?> implementingClass, Configuration conf,
       String principal, AuthenticationToken token) {
+    if (isConnectorInfoSet(implementingClass, conf))
+      throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName()
+          + " can only be set once per job");
     checkArgument(principal != null, "principal is null");
     checkArgument(token != null, "token is null");
-    Properties props = getClientProperties(implementingClass, conf);
-    props.setProperty(ClientProperty.AUTH_PRINCIPAL.getKey(), principal);
-    ClientProperty.setAuthenticationToken(props, token);
-    setClientProperties(implementingClass, conf, props);
     conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
+    conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal);
+    if (token instanceof DelegationTokenImpl) {
+      // Avoid serializing the DelegationToken secret in the configuration -- the Job will do that
+      // work for us securely
+      DelegationTokenImpl delToken = (DelegationTokenImpl) token;
+      conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), TokenSource.JOB.prefix()
+          + token.getClass().getName() + ":" + delToken.getServiceName().toString());
+    } else {
+      conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN),
+          TokenSource.INLINE.prefix() + token.getClass().getName() + ":"
+              + Base64.getEncoder().encodeToString(AuthenticationTokenSerializer.serialize(token)));
+    }
   }
 
   /**
@@ -306,8 +228,7 @@ public class ConfiguratorBase {
    * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
    */
   public static String getPrincipal(Class<?> implementingClass, Configuration conf) {
-    Properties props = getClientProperties(implementingClass, conf);
-    return props.getProperty(ClientProperty.AUTH_PRINCIPAL.getKey());
+    return conf.get(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL));
   }
 
   /**
@@ -321,15 +242,79 @@ public class ConfiguratorBase {
    * @return the principal's authentication token
    * @since 1.6.0
    * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
+   * @see #setConnectorInfo(Class, Configuration, String, String)
    */
   public static AuthenticationToken getAuthenticationToken(Class<?> implementingClass,
       Configuration conf) {
-    Properties props = getClientProperties(implementingClass, conf);
-    return ClientProperty.getAuthenticationToken(props);
+    String token = conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN));
+    if (token == null || token.isEmpty())
+      return null;
+    if (token.startsWith(TokenSource.INLINE.prefix())) {
+      String[] args = token.substring(TokenSource.INLINE.prefix().length()).split(":", 2);
+      if (args.length == 2)
+        return AuthenticationTokenSerializer.deserialize(args[0],
+            Base64.getDecoder().decode(args[1]));
+    } else if (token.startsWith(TokenSource.FILE.prefix())) {
+      String tokenFileName = token.substring(TokenSource.FILE.prefix().length());
+      return getTokenFromFile(conf, getPrincipal(implementingClass, conf), tokenFileName);
+    } else if (token.startsWith(TokenSource.JOB.prefix())) {
+      String[] args = token.substring(TokenSource.JOB.prefix().length()).split(":", 2);
+      if (args.length == 2) {
+        String className = args[0], serviceName = args[1];
+        if (DelegationTokenImpl.class.getName().equals(className)) {
+          return new org.apache.accumulo.core.clientImpl.mapreduce.DelegationTokenStub(serviceName);
+        }
+      }
+    }
+
+    throw new IllegalStateException("Token was not properly serialized into the configuration");
+  }
+
+  /**
+   * Reads from the token file in distributed cache. Currently, the token file stores data separated
+   * by colons e.g. principal:token_class:token
+   *
+   * @param conf
+   *          the Hadoop context for the configured job
+   * @return path to the token file as a String
+   * @since 1.6.0
+   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
+   */
+  public static AuthenticationToken getTokenFromFile(Configuration conf, String principal,
+      String tokenFile) {
+    FSDataInputStream in = null;
+    try {
+      URI[] uris = DistributedCacheHelper.getCacheFiles(conf);
+      Path path = null;
+      for (URI u : uris) {
+        if (u.toString().equals(tokenFile)) {
+          path = new Path(u);
+        }
+      }
+      if (path == null) {
+        throw new IllegalArgumentException(
+            "Couldn't find password file called \"" + tokenFile + "\" in cache.");
+      }
+      FileSystem fs = FileSystem.get(conf);
+      in = fs.open(path);
+    } catch (IOException e) {
+      throw new IllegalArgumentException(
+          "Couldn't open password file called \"" + tokenFile + "\".");
+    }
+    try (java.util.Scanner fileScanner = new java.util.Scanner(in)) {
+      while (fileScanner.hasNextLine()) {
+        Credentials creds = Credentials.deserialize(fileScanner.nextLine());
+        if (principal.equals(creds.getPrincipal())) {
+          return creds.getToken();
+        }
+      }
+      throw new IllegalArgumentException(
+          "Couldn't find token for user \"" + principal + "\" in file \"" + tokenFile + "\"");
+    }
   }
 
   /**
-   * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job.
+   * Configures a ZooKeeperInstance for this job.
    *
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -341,19 +326,20 @@ public class ConfiguratorBase {
    */
   public static void setZooKeeperInstance(Class<?> implementingClass, Configuration conf,
       org.apache.accumulo.core.client.ClientConfiguration clientConfig) {
-    Properties props = getClientProperties(implementingClass, conf);
-    Properties newProps = ClientConfConverter.toProperties(clientConfig);
-    for (Object keyObj : newProps.keySet()) {
-      String propKey = (String) keyObj;
-      String val = newProps.getProperty(propKey);
-      props.setProperty(propKey, val);
+    String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
+    if (!conf.get(key, "").isEmpty())
+      throw new IllegalStateException(
+          "Instance info can only be set once per job; it has already been configured with "
+              + conf.get(key));
+    conf.set(key, "ZooKeeperInstance");
+    if (clientConfig != null) {
+      conf.set(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG),
+          clientConfig.serialize());
     }
-    setClientProperties(implementingClass, conf, props);
   }
 
   /**
-   * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the
-   * configuration.
+   * Initializes an Accumulo Instance based on the configuration.
    *
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -364,9 +350,15 @@ public class ConfiguratorBase {
    */
   public static org.apache.accumulo.core.client.Instance getInstance(Class<?> implementingClass,
       Configuration conf) {
-    ClientInfo info = ClientInfo.from(getClientProperties(implementingClass, conf));
-    return new org.apache.accumulo.core.client.ZooKeeperInstance(info.getZooKeepers(),
-        info.getZooKeepers());
+    String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE), "");
+    if ("ZooKeeperInstance".equals(instanceType)) {
+      return new org.apache.accumulo.core.client.ZooKeeperInstance(
+          getClientConfiguration(implementingClass, conf));
+    } else if (instanceType.isEmpty())
+      throw new IllegalStateException(
+          "Instance has not been configured for " + implementingClass.getSimpleName());
+    else
+      throw new IllegalStateException("Unrecognized instance type " + instanceType);
   }
 
   /**
@@ -382,7 +374,23 @@ public class ConfiguratorBase {
    */
   public static org.apache.accumulo.core.client.ClientConfiguration getClientConfiguration(
       Class<?> implementingClass, Configuration conf) {
-    return ClientConfConverter.toClientConf(getClientProperties(implementingClass, conf));
+    String clientConfigString = conf
+        .get(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG));
+    if (null != clientConfigString) {
+      return org.apache.accumulo.core.client.ClientConfiguration.deserialize(clientConfigString);
+    }
+
+    String instanceName = conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME));
+    String zookeepers = conf.get(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS));
+    org.apache.accumulo.core.client.ClientConfiguration clientConf = org.apache.accumulo.core.client.ClientConfiguration
+        .loadDefault();
+    if (null != instanceName) {
+      clientConf.withInstance(instanceName);
+    }
+    if (null != zookeepers) {
+      clientConf.withZkHosts(zookeepers);
+    }
+    return clientConf;
   }
 
   /**
@@ -443,9 +451,8 @@ public class ConfiguratorBase {
   }
 
   /**
-   * Unwraps the provided {@link AuthenticationToken} if it is an instance of
-   * {@link org.apache.accumulo.core.clientImpl.mapreduce.DelegationTokenStub}, reconstituting it
-   * from the provided {@link JobConf}.
+   * Unwraps the provided {@link AuthenticationToken} if it is an instance of DelegationTokenStub,
+   * reconstituting it from the provided {@link JobConf}.
    *
    * @param job
    *          The job
@@ -474,9 +481,8 @@ public class ConfiguratorBase {
   }
 
   /**
-   * Unwraps the provided {@link AuthenticationToken} if it is an instance of
-   * {@link org.apache.accumulo.core.clientImpl.mapreduce.DelegationTokenStub}, reconstituting it
-   * from the provided {@link JobConf}.
+   * Unwraps the provided {@link AuthenticationToken} if it is an instance of DelegationTokenStub,
+   * reconstituting it from the provided {@link JobConf}.
    *
    * @param job
    *          The job
@@ -503,4 +509,40 @@ public class ConfiguratorBase {
     }
     return token;
   }
+
+  public static ClientContext client(Class<?> CLASS, Configuration conf)
+      throws AccumuloException, AccumuloSecurityException {
+    return ((org.apache.accumulo.core.clientImpl.ConnectorImpl) getInstance(CLASS, conf)
+        .getConnector(getPrincipal(CLASS, conf), getAuthenticationToken(CLASS, conf)))
+            .getAccumuloClient();
+  }
+
+  public static ClientContext client(Class<?> CLASS,
+      org.apache.accumulo.core.client.mapreduce.RangeInputSplit split, Configuration conf)
+      throws IOException {
+    try {
+      org.apache.accumulo.core.client.Instance instance = split
+          .getInstance(getClientConfiguration(CLASS, conf));
+      if (instance == null) {
+        instance = getInstance(CLASS, conf);
+      }
+
+      String principal = split.getPrincipal();
+      if (principal == null) {
+        principal = getPrincipal(CLASS, conf);
+      }
+
+      AuthenticationToken token = split.getToken();
+      if (token == null) {
+        token = getAuthenticationToken(CLASS, conf);
+      }
+
+      return ((org.apache.accumulo.core.clientImpl.ConnectorImpl) instance.getConnector(principal,
+          token)).getAccumuloClient();
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      throw new IOException(e);
+    }
+
+  }
+
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/FileOutputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/FileOutputConfigurator.java
index 105fe51..f23e880 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/FileOutputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/FileOutputConfigurator.java
@@ -32,9 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 
 /**
  * @since 1.6.0
- * @deprecated since 2.0.0
  */
-@Deprecated
 public class FileOutputConfigurator extends ConfiguratorBase {
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
index d1915ba..a15ae8a 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
@@ -36,7 +36,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
 
-import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -49,7 +48,6 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.clientImpl.ClientInfo;
 import org.apache.accumulo.core.clientImpl.Table;
 import org.apache.accumulo.core.clientImpl.Tables;
 import org.apache.accumulo.core.data.Key;
@@ -74,11 +72,7 @@ import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.collect.Maps;
 
-/**
- * @since 1.6.0
- * @deprecated since 2.0.0
- */
-@Deprecated
+@SuppressWarnings("deprecation")
 public class InputConfigurator extends ConfiguratorBase {
 
   /**
@@ -94,8 +88,7 @@ public class InputConfigurator extends ConfiguratorBase {
     ITERATORS,
     TABLE_CONFIGS,
     SAMPLER_CONFIG,
-    CLASSLOADER_CONTEXT,
-    EXECUTION_HINTS
+    CLASSLOADER_CONTEXT
   }
 
   /**
@@ -634,8 +627,7 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param conf
    *          the Hadoop configuration object to configure
    * @param configs
-   *          an array of {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} objects
-   *          to associate with the job
+   *          an array of InputTableConfig objects to associate with the job
    * @since 1.6.0
    */
   public static void setInputTableConfigs(Class<?> implementingClass, Configuration conf,
@@ -657,8 +649,7 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   /**
-   * Returns all {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} objects
-   * associated with this job.
+   * Returns all InputTableConfig objects associated with this job.
    *
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -695,8 +686,7 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   /**
-   * Returns the {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} for the given
-   * table
+   * Returns the InputTableConfig for the given table
    *
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -727,8 +717,8 @@ public class InputConfigurator extends ConfiguratorBase {
       throws IOException {
     Map<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> inputTableConfigs = getInputTableConfigs(
         implementingClass, conf);
-    try (AccumuloClient client = Accumulo.newClient()
-        .from(getClientProperties(implementingClass, conf)).build()) {
+    try {
+      AccumuloClient client = client(implementingClass, conf);
       if (getInputTableConfigs(implementingClass, conf).size() == 0)
         throw new IOException("No table set.");
 
@@ -764,8 +754,8 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   /**
-   * Returns the {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} for the
-   * configuration based on the properties set using the single-table input methods.
+   * Returns the InputTableConfig for the configuration based on the properties set using the
+   * single-table input methods.
    *
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -809,102 +799,100 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   public static Map<String,Map<KeyExtent,List<Range>>> binOffline(Table.ID tableId,
-      List<Range> ranges, ClientInfo clientInfo) throws AccumuloException, TableNotFoundException {
+      List<Range> ranges, ClientContext context) throws AccumuloException, TableNotFoundException {
 
-    try (ClientContext context = new ClientContext(clientInfo)) {
-      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
+    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
 
+    if (Tables.getTableState(context, tableId) != TableState.OFFLINE) {
+      Tables.clearCache(context);
       if (Tables.getTableState(context, tableId) != TableState.OFFLINE) {
-        Tables.clearCache(context);
-        if (Tables.getTableState(context, tableId) != TableState.OFFLINE) {
-          throw new AccumuloException(
-              "Table is online tableId:" + tableId + " cannot scan table in offline mode ");
-        }
+        throw new AccumuloException(
+            "Table is online tableId:" + tableId + " cannot scan table in offline mode ");
       }
+    }
 
-      for (Range range : ranges) {
-        Text startRow;
-
-        if (range.getStartKey() != null)
-          startRow = range.getStartKey().getRow();
-        else
-          startRow = new Text();
-
-        Range metadataRange = new Range(new KeyExtent(tableId, startRow, null).getMetadataEntry(),
-            true, null, false);
-        Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-        MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-        scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
-        scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
-        scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
-        scanner.setRange(metadataRange);
-
-        RowIterator rowIter = new RowIterator(scanner);
-        KeyExtent lastExtent = null;
-        while (rowIter.hasNext()) {
-          Iterator<Map.Entry<Key,Value>> row = rowIter.next();
-          String last = "";
-          KeyExtent extent = null;
-          String location = null;
-
-          while (row.hasNext()) {
-            Map.Entry<Key,Value> entry = row.next();
-            Key key = entry.getKey();
-
-            if (key.getColumnFamily()
-                .equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
-              last = entry.getValue().toString();
-            }
-
-            if (key.getColumnFamily()
-                .equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
-                || key.getColumnFamily()
-                    .equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
-              location = entry.getValue().toString();
-            }
+    for (Range range : ranges) {
+      Text startRow;
+
+      if (range.getStartKey() != null)
+        startRow = range.getStartKey().getRow();
+      else
+        startRow = new Text();
+
+      Range metadataRange = new Range(new KeyExtent(tableId, startRow, null).getMetadataEntry(),
+          true, null, false);
+      Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
+      scanner.setRange(metadataRange);
+
+      RowIterator rowIter = new RowIterator(scanner);
+      KeyExtent lastExtent = null;
+      while (rowIter.hasNext()) {
+        Iterator<Map.Entry<Key,Value>> row = rowIter.next();
+        String last = "";
+        KeyExtent extent = null;
+        String location = null;
+
+        while (row.hasNext()) {
+          Map.Entry<Key,Value> entry = row.next();
+          Key key = entry.getKey();
+
+          if (key.getColumnFamily()
+              .equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
+            last = entry.getValue().toString();
+          }
 
-            if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
-              extent = new KeyExtent(key.getRow(), entry.getValue());
-            }
+          if (key.getColumnFamily()
+              .equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
+              || key.getColumnFamily()
+                  .equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
+            location = entry.getValue().toString();
+          }
 
+          if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
+            extent = new KeyExtent(key.getRow(), entry.getValue());
           }
 
-          if (location != null)
-            return null;
+        }
 
-          if (!extent.getTableId().equals(tableId)) {
-            throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
-          }
+        if (location != null)
+          return null;
 
-          if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
-            throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
-          }
+        if (!extent.getTableId().equals(tableId)) {
+          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
+        }
 
-          Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
-          if (tabletRanges == null) {
-            tabletRanges = new HashMap<>();
-            binnedRanges.put(last, tabletRanges);
-          }
+        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
+          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
+        }
 
-          List<Range> rangeList = tabletRanges.get(extent);
-          if (rangeList == null) {
-            rangeList = new ArrayList<>();
-            tabletRanges.put(extent, rangeList);
-          }
+        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
+        if (tabletRanges == null) {
+          tabletRanges = new HashMap<>();
+          binnedRanges.put(last, tabletRanges);
+        }
 
-          rangeList.add(range);
+        List<Range> rangeList = tabletRanges.get(extent);
+        if (rangeList == null) {
+          rangeList = new ArrayList<>();
+          tabletRanges.put(extent, rangeList);
+        }
 
-          if (extent.getEndRow() == null
-              || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
-            break;
-          }
+        rangeList.add(range);
 
-          lastExtent = extent;
+        if (extent.getEndRow() == null
+            || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
+          break;
         }
 
+        lastExtent = extent;
       }
-      return binnedRanges;
+
     }
+    return binnedRanges;
   }
 
   private static String toBase64(Writable writable) {
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/OutputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/OutputConfigurator.java
index 6d003c0..dd2508c 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/OutputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/OutputConfigurator.java
@@ -28,11 +28,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.hadoop.conf.Configuration;
 
-/**
- * @since 1.6.0
- * @deprecated since 2.0.0
- */
-@Deprecated
 public class OutputConfigurator extends ConfiguratorBase {
 
   /**
diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBaseTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBaseTest.java
index c465005..c365dd7 100644
--- a/core/src/test/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBaseTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBaseTest.java
@@ -21,22 +21,17 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.Properties;
+import java.util.Base64;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.clientImpl.ClientInfo;
-import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
-/**
- * @deprecated since 2.0.0
- */
-@Deprecated
 public class ConfiguratorBaseTest {
 
   private enum PrivateTestingEnum {
@@ -63,22 +58,26 @@ public class ConfiguratorBaseTest {
     assertNotNull(token);
     assertEquals(PasswordToken.class, token.getClass());
     assertEquals(new PasswordToken("testPassword"), token);
+    assertEquals(
+        "inline:" + PasswordToken.class.getName() + ":"
+            + Base64.getEncoder().encodeToString(
+                AuthenticationTokenSerializer.serialize(new PasswordToken("testPassword"))),
+        conf.get(
+            ConfiguratorBase.enumToConfKey(this.getClass(), ConfiguratorBase.ConnectorInfo.TOKEN)));
   }
 
   @Test
   public void testSetConnectorInfoClassOfQConfigurationStringString() {
     Configuration conf = new Configuration();
     assertFalse(ConfiguratorBase.isConnectorInfoSet(this.getClass(), conf));
-    ConfiguratorBase.setConnectorInfo(this.getClass(), conf, "testUser",
-        new PasswordToken("testPass"));
+    ConfiguratorBase.setConnectorInfo(this.getClass(), conf, "testUser", "testFile");
     assertTrue(ConfiguratorBase.isConnectorInfoSet(this.getClass(), conf));
     assertEquals("testUser", ConfiguratorBase.getPrincipal(this.getClass(), conf));
-    assertEquals("testPass",
-        new String(((PasswordToken) ClientInfo
-            .from(ConfiguratorBase.getClientProperties(this.getClass(), conf))
-            .getAuthenticationToken()).getPassword()));
+    assertEquals("file:testFile", conf.get(
+        ConfiguratorBase.enumToConfKey(this.getClass(), ConfiguratorBase.ConnectorInfo.TOKEN)));
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void testSetZooKeeperInstance() {
     Configuration conf = new Configuration();
@@ -86,17 +85,19 @@ public class ConfiguratorBaseTest {
         org.apache.accumulo.core.client.ClientConfiguration.create()
             .withInstance("testInstanceName").withZkHosts("testZooKeepers").withSsl(true)
             .withZkTimeout(15000));
-
-    org.apache.accumulo.core.client.ClientConfiguration clientConf = ConfiguratorBase
-        .getClientConfiguration(this.getClass(), conf);
+    org.apache.accumulo.core.client.ClientConfiguration clientConf = org.apache.accumulo.core.client.ClientConfiguration
+        .deserialize(conf.get(ConfiguratorBase.enumToConfKey(this.getClass(),
+            ConfiguratorBase.InstanceOpts.CLIENT_CONFIG)));
     assertEquals("testInstanceName", clientConf
         .get(org.apache.accumulo.core.client.ClientConfiguration.ClientProperty.INSTANCE_NAME));
-
-    Properties props = ConfiguratorBase.getClientProperties(this.getClass(), conf);
-    assertEquals("testInstanceName", props.getProperty(ClientProperty.INSTANCE_NAME.getKey()));
-    assertEquals("testZooKeepers", props.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey()));
-    assertEquals("true", props.getProperty(ClientProperty.SSL_ENABLED.getKey()));
-    assertEquals("15000", props.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey()));
+    assertEquals("testZooKeepers", clientConf
+        .get(org.apache.accumulo.core.client.ClientConfiguration.ClientProperty.INSTANCE_ZK_HOST));
+    assertEquals("true", clientConf.get(
+        org.apache.accumulo.core.client.ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_ENABLED));
+    assertEquals("15000", clientConf.get(
+        org.apache.accumulo.core.client.ClientConfiguration.ClientProperty.INSTANCE_ZK_TIMEOUT));
+    assertEquals(org.apache.accumulo.core.client.ZooKeeperInstance.class.getSimpleName(), conf
+        .get(ConfiguratorBase.enumToConfKey(this.getClass(), ConfiguratorBase.InstanceOpts.TYPE)));
   }
 
   @Test
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
index 7e105aa..9744977 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.clientImpl.ClientInfo;
+import org.apache.accumulo.core.clientImpl.Credentials;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -46,6 +47,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.Rule;
@@ -61,7 +63,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 public class TokenFileIT extends AccumuloClusterHarness {
   private static AssertionError e1 = null;
 
-  private static class MRTokenFileTester extends Configured implements Tool {
+  public static class MRTokenFileTester extends Configured implements Tool {
     private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
       Key key = null;
       int count = 0;
@@ -113,11 +115,11 @@ public class TokenFileIT extends AccumuloClusterHarness {
       job.setInputFormat(org.apache.accumulo.core.client.mapred.AccumuloInputFormat.class);
 
       ClientInfo info = getClientInfo();
+      org.apache.accumulo.core.client.mapred.AccumuloInputFormat.setZooKeeperInstance(job,
+          info.getInstanceName(), info.getZooKeepers());
       org.apache.accumulo.core.client.mapred.AccumuloInputFormat.setConnectorInfo(job, user,
           tokenFile);
       org.apache.accumulo.core.client.mapred.AccumuloInputFormat.setInputTableName(job, table1);
-      org.apache.accumulo.core.client.mapred.AccumuloInputFormat.setZooKeeperInstance(job,
-          info.getInstanceName(), info.getZooKeepers());
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -126,33 +128,31 @@ public class TokenFileIT extends AccumuloClusterHarness {
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Mutation.class);
 
+      org.apache.accumulo.core.client.mapred.AccumuloOutputFormat.setZooKeeperInstance(job,
+          info.getInstanceName(), info.getZooKeepers());
       org.apache.accumulo.core.client.mapred.AccumuloOutputFormat.setConnectorInfo(job, user,
           tokenFile);
       org.apache.accumulo.core.client.mapred.AccumuloOutputFormat.setCreateTables(job, false);
       org.apache.accumulo.core.client.mapred.AccumuloOutputFormat.setDefaultTableName(job, table2);
-      org.apache.accumulo.core.client.mapred.AccumuloOutputFormat.setZooKeeperInstance(job,
-          info.getInstanceName(), info.getZooKeepers());
 
       job.setNumReduceTasks(0);
 
-      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+      RunningJob rj = JobClient.runJob(job);
+      if (rj.isSuccessful()) {
+        return 0;
+      } else {
+        System.out.println(rj.getFailureInfo());
+        return 1;
+      }
     }
 
-    @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
-    public static void main(String[] args) throws Exception {
-      Configuration conf = CachedConfiguration.getInstance();
-      conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
-      conf.set("mapreduce.framework.name", "local");
-      conf.set("mapreduce.cluster.local.dir",
-          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
-      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
-    }
   }
 
   @Rule
   public TemporaryFolder folder = new TemporaryFolder(
       new File(System.getProperty("user.dir") + "/target"));
 
+  @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
   @Test
   public void testMR() throws Exception {
     String[] tableNames = getUniqueNames(2);
@@ -169,12 +169,20 @@ public class TokenFileIT extends AccumuloClusterHarness {
       }
       bw.close();
 
-      File tf = folder.newFile("client.properties");
+      File tf = folder.newFile("root_test.pw");
       try (PrintStream out = new PrintStream(tf)) {
-        getClientInfo().getProperties().store(out, "Credentials for " + getClass().getName());
+        String outString = new Credentials(getAdminPrincipal(), getAdminToken()).serialize();
+        out.println(outString);
       }
 
-      MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(tf.getAbsolutePath()).getParent());
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(),
+          new String[] {tf.getAbsolutePath(), table1, table2}));
+
       assertNull(e1);
 
       try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java
index 56b8fd3..ea0b146 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import org.apache.accumulo.core.cli.ClientOpts;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
@@ -80,13 +81,20 @@ public class RowHash extends Configured implements Tool {
       return tableName;
     }
 
-    public void setAccumuloConfigs(Job job) {
-      org.apache.accumulo.core.clientImpl.mapreduce.lib.InputConfigurator.setClientProperties(
-          org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.class,
-          job.getConfiguration(), this.getClientProperties());
-      org.apache.accumulo.core.clientImpl.mapreduce.lib.OutputConfigurator.setClientProperties(
-          org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.class,
-          job.getConfiguration(), this.getClientProperties());
+    public void setAccumuloConfigs(Job job) throws AccumuloSecurityException {
+      org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setZooKeeperInstance(job,
+          this.instance, this.zookeepers);
+      org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setZooKeeperInstance(job,
+          this.instance, this.zookeepers);
+
+      final String principal = getPrincipal();
+      getTableName();
+
+      AuthenticationToken token = getToken();
+      org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setConnectorInfo(job, principal,
+          token);
+      org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setConnectorInfo(job,
+          principal, token);
       org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setInputTableName(job,
           getTableName());
       org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanAuthorizations(job,
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
index 1e8917c..c6d92e9 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.clientImpl.ClientInfo;
+import org.apache.accumulo.core.clientImpl.Credentials;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -131,24 +132,21 @@ public class TokenFileIT extends AccumuloClusterHarness {
 
       job.waitForCompletion(true);
 
-      return job.isSuccessful() ? 0 : 1;
+      if (job.isSuccessful()) {
+        return 0;
+      } else {
+        System.out.println(job.getStatus().getFailureInfo());
+        return 1;
+      }
     }
 
-    @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
-    public static void main(String[] args) throws Exception {
-      Configuration conf = CachedConfiguration.getInstance();
-      conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
-      conf.set("mapreduce.framework.name", "local");
-      conf.set("mapreduce.cluster.local.dir",
-          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
-      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
-    }
   }
 
   @Rule
   public TemporaryFolder folder = new TemporaryFolder(
       new File(System.getProperty("user.dir") + "/target"));
 
+  @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
   @Test
   public void testMR() throws Exception {
     String[] tableNames = getUniqueNames(2);
@@ -165,12 +163,19 @@ public class TokenFileIT extends AccumuloClusterHarness {
       }
       bw.close();
 
-      File tf = folder.newFile("client.properties");
+      File tf = folder.newFile("root_test.pw");
       try (PrintStream out = new PrintStream(tf)) {
-        getClientInfo().getProperties().store(out, "Credentials for " + getClass().getName());
+        String outString = new Credentials(getAdminPrincipal(), getAdminToken()).serialize();
+        out.println(outString);
       }
 
-      MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(tf.getAbsolutePath()).getParent());
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(),
+          new String[] {tf.getAbsolutePath(), table1, table2}));
       assertNull(e1);
 
       try (Scanner scanner = c.createScanner(table2, new Authorizations())) {


Mime
View raw message