accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [2/2] git commit: ACCUMULO-1674 Cleanup M/R config with AuthenticationToken serialization
Date Sat, 07 Sep 2013 03:10:05 GMT
ACCUMULO-1674 Cleanup M/R config with AuthenticationToken serialization


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f0fcd6d1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f0fcd6d1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f0fcd6d1

Branch: refs/heads/master
Commit: f0fcd6d117918a0adb77ea1b295567860a410edb
Parents: de24f83
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Fri Sep 6 23:08:54 2013 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Fri Sep 6 23:08:54 2013 -0400

----------------------------------------------------------------------
 .../client/mapred/AccumuloOutputFormat.java     | 141 ++++++-----
 .../core/client/mapred/InputFormatBase.java     | 218 ++++++++---------
 .../client/mapreduce/AccumuloOutputFormat.java  | 143 ++++++-----
 .../core/client/mapreduce/InputFormatBase.java  | 243 +++++++++----------
 .../mapreduce/lib/util/ConfiguratorBase.java    | 136 +++++------
 .../mapreduce/lib/util/InputConfigurator.java   |  70 +++---
 .../lib/util/ConfiguratorBaseTest.java          | 119 +++++++++
 7 files changed, 590 insertions(+), 480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0fcd6d1/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
index a91386f..908b8b3 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
@@ -67,10 +67,10 @@ import org.apache.log4j.Logger;
  * Other static methods are optional.
  */
 public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
-  
+
   private static final Class<?> CLASS = AccumuloOutputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -90,7 +90,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
     OutputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -109,7 +109,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
     OutputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
   }
-  
+
   /**
    * Determines if the connector has been configured.
    * 
@@ -122,7 +122,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   protected static Boolean isConnectorInfoSet(JobConf job) {
     return OutputConfigurator.isConnectorInfoSet(CLASS, job);
   }
-  
+
   /**
    * Gets the principal from the configuration.
    * 
@@ -135,34 +135,43 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   protected static String getPrincipal(JobConf job) {
     return OutputConfigurator.getPrincipal(CLASS, job);
   }
-  
+
   /**
-   * Gets the serialized token class from the configuration.
+   * Gets the serialized token class from either the configuration or the token file.
    * 
-   * @param job
-   *          the Hadoop context for the configured job
-   * @return the user name
    * @since 1.5.0
-   * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+   * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead.
    */
+  @Deprecated
   protected static String getTokenClass(JobConf job) {
-    return OutputConfigurator.getTokenClass(CLASS, job);
+    return getAuthenticationToken(job).getClass().getName();
   }
-  
+
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Gets the serialized token from either the configuration or the token file.
    * 
-   * @param job
-   *          the Hadoop context for the configured job
-   * @return the decoded user password
    * @since 1.5.0
-   * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+   * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead.
    */
+  @Deprecated
   protected static byte[] getToken(JobConf job) {
-    return OutputConfigurator.getToken(CLASS, job);
+    return AuthenticationTokenSerializer.serialize(getAuthenticationToken(job));
   }
-  
+
+  /**
+   * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @return the principal's authentication token
+   * @since 1.6.0
+   * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+   * @see #setConnectorInfo(JobConf, String, String)
+   */
+  protected static AuthenticationToken getAuthenticationToken(JobConf job) {
+    return OutputConfigurator.getAuthenticationToken(CLASS, job);
+  }
+
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
    * 
@@ -177,7 +186,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
     OutputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers);
   }
-  
+
   /**
    * Configures a {@link MockInstance} for this job.
    * 
@@ -190,7 +199,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setMockInstance(JobConf job, String instanceName) {
     OutputConfigurator.setMockInstance(CLASS, job, instanceName);
   }
-  
+
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
@@ -204,7 +213,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   protected static Instance getInstance(JobConf job) {
     return OutputConfigurator.getInstance(CLASS, job);
   }
-  
+
   /**
    * Sets the log level for this job.
    * 
@@ -217,7 +226,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setLogLevel(JobConf job, Level level) {
     OutputConfigurator.setLogLevel(CLASS, job, level);
   }
-  
+
   /**
    * Gets the log level from this configuration.
    * 
@@ -230,7 +239,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   protected static Level getLogLevel(JobConf job) {
     return OutputConfigurator.getLogLevel(CLASS, job);
   }
-  
+
   /**
    * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and
    * underscores.
@@ -244,7 +253,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setDefaultTableName(JobConf job, String tableName) {
     OutputConfigurator.setDefaultTableName(CLASS, job, tableName);
   }
-  
+
   /**
    * Gets the default table name from the configuration.
    * 
@@ -257,7 +266,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   protected static String getDefaultTableName(JobConf job) {
     return OutputConfigurator.getDefaultTableName(CLASS, job);
   }
-  
+
   /**
    * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
    * used. Setting the configuration multiple times overwrites any previous configuration.
@@ -271,7 +280,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setBatchWriterOptions(JobConf job, BatchWriterConfig bwConfig) {
     OutputConfigurator.setBatchWriterOptions(CLASS, job, bwConfig);
   }
-  
+
   /**
    * Gets the {@link BatchWriterConfig} settings.
    * 
@@ -284,7 +293,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   protected static BatchWriterConfig getBatchWriterOptions(JobConf job) {
     return OutputConfigurator.getBatchWriterOptions(CLASS, job);
   }
-  
+
   /**
    * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores.
    * 
@@ -300,7 +309,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setCreateTables(JobConf job, boolean enableFeature) {
     OutputConfigurator.setCreateTables(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether tables are permitted to be created as needed.
    * 
@@ -313,7 +322,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   protected static Boolean canCreateTables(JobConf job) {
     return OutputConfigurator.canCreateTables(CLASS, job);
   }
-  
+
   /**
    * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing.
    * 
@@ -329,7 +338,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   public static void setSimulationMode(JobConf job, boolean enableFeature) {
     OutputConfigurator.setSimulationMode(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether this feature is enabled.
    * 
@@ -342,7 +351,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
   protected static Boolean getSimulationMode(JobConf job) {
     return OutputConfigurator.getSimulationMode(CLASS, job);
   }
-  
+
   /**
    * A base class to be used to create {@link RecordWriter} instances that write to Accumulo.
    */
@@ -350,36 +359,36 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
     private MultiTableBatchWriter mtbw = null;
     private HashMap<Text,BatchWriter> bws = null;
     private Text defaultTableName = null;
-    
+
     private boolean simulate = false;
     private boolean createTables = false;
-    
+
     private long mutCount = 0;
     private long valCount = 0;
-    
+
     private Connector conn;
-    
+
     protected AccumuloRecordWriter(JobConf job) throws AccumuloException, AccumuloSecurityException, IOException {
       Level l = getLogLevel(job);
       if (l != null)
         log.setLevel(getLogLevel(job));
       this.simulate = getSimulationMode(job);
       this.createTables = canCreateTables(job);
-      
+
       if (simulate)
         log.info("Simulating output only. No writes to tables will occur");
-      
+
       this.bws = new HashMap<Text,BatchWriter>();
-      
+
       String tname = getDefaultTableName(job);
       this.defaultTableName = (tname == null) ? null : new Text(tname);
-      
+
       if (!simulate) {
-        this.conn = getInstance(job).getConnector(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job)));
+        this.conn = getInstance(job).getConnector(getPrincipal(job), getAuthenticationToken(job));
         mtbw = conn.createMultiTableBatchWriter(getBatchWriterOptions(job));
       }
     }
-    
+
     /**
      * Push a mutation into a table. If table is null, the defaultTable will be used. If canCreateTable is set, the table will be created if it does not exist.
      * The table name must only contain alphanumerics and underscore.
@@ -388,17 +397,17 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
     public void write(Text table, Mutation mutation) throws IOException {
       if (table == null || table.toString().isEmpty())
         table = this.defaultTableName;
-      
+
       if (!simulate && table == null)
         throw new IOException("No table or default table specified. Try simulation mode next time");
-      
+
       ++mutCount;
       valCount += mutation.size();
       printMutation(table, mutation);
-      
+
       if (simulate)
         return;
-      
+
       if (!bws.containsKey(table))
         try {
           addTable(table);
@@ -406,24 +415,24 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
           e.printStackTrace();
           throw new IOException(e);
         }
-      
+
       try {
         bws.get(table).addMutation(mutation);
       } catch (MutationsRejectedException e) {
         throw new IOException(e);
       }
     }
-    
+
     public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException {
       if (simulate) {
         log.info("Simulating adding table: " + tableName);
         return;
       }
-      
+
       log.debug("Adding table: " + tableName);
       BatchWriter bw = null;
       String table = tableName.toString();
-      
+
       if (createTables && !conn.tableOperations().exists(table)) {
         try {
           conn.tableOperations().create(table);
@@ -434,7 +443,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
           // Shouldn't happen
         }
       }
-      
+
       try {
         bw = mtbw.getBatchWriter(table);
       } catch (TableNotFoundException e) {
@@ -445,11 +454,11 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
       } catch (AccumuloSecurityException e) {
         throw e;
       }
-      
+
       if (bw != null)
         bws.put(tableName, bw);
     }
-    
+
     private int printMutation(Text table, Mutation m) {
       if (log.isTraceEnabled()) {
         log.trace(String.format("Table %s row key: %s", table, hexDump(m.getRow())));
@@ -461,7 +470,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
       }
       return m.getUpdates().size();
     }
-    
+
     private String hexDump(byte[] ba) {
       StringBuilder sb = new StringBuilder();
       for (byte b : ba) {
@@ -472,13 +481,13 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
       }
       return sb.toString();
     }
-    
+
     @Override
     public void close(Reporter reporter) throws IOException {
       log.debug("mutations written: " + mutCount + ", values written: " + valCount);
       if (simulate)
         return;
-      
+
       try {
         mtbw.close();
       } catch (MutationsRejectedException e) {
@@ -492,25 +501,27 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
             }
             secCodes.addAll(ke.getValue());
           }
-          
+
           log.error("Not authorized to write to tables : " + tables);
         }
-        
+
         if (e.getConstraintViolationSummaries().size() > 0) {
           log.error("Constraint violations : " + e.getConstraintViolationSummaries().size());
         }
       }
     }
   }
-  
+
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
     if (!isConnectorInfoSet(job))
       throw new IOException("Connector info has not been set.");
     try {
       // if the instance isn't configured, it will complain here
-      Connector c = getInstance(job).getConnector(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job)));
-      if (!c.securityOperations().authenticateUser(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job))))
+      String principal = getPrincipal(job);
+      AuthenticationToken token = getAuthenticationToken(job);
+      Connector c = getInstance(job).getConnector(principal, token);
+      if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
     } catch (AccumuloException e) {
       throw new IOException(e);
@@ -518,7 +529,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
       throw new IOException(e);
     }
   }
-  
+
   @Override
   public RecordWriter<Text,Mutation> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
     try {
@@ -527,5 +538,5 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
       throw new IOException(e);
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0fcd6d1/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
index 9f16ab8..c796cd2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
@@ -79,10 +79,10 @@ import org.apache.log4j.Logger;
  * See {@link AccumuloInputFormat} for an example implementation.
  */
 public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
-  
+
   private static final Class<?> CLASS = AccumuloInputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -102,7 +102,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -121,7 +121,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
   }
-  
+
   /**
    * Determines if the connector has been configured.
    * 
@@ -134,7 +134,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Boolean isConnectorInfoSet(JobConf job) {
     return InputConfigurator.isConnectorInfoSet(CLASS, job);
   }
-  
+
   /**
    * Gets the user name from the configuration.
    * 
@@ -147,48 +147,43 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getPrincipal(JobConf job) {
     return InputConfigurator.getPrincipal(CLASS, job);
   }
-  
+
   /**
-   * Gets the serialized token class from the configuration.
+   * Gets the serialized token class from either the configuration or the token file.
    * 
-   * @param job
-   *          the Hadoop context for the configured job
-   * @return the user name
    * @since 1.5.0
-   * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+   * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead.
    */
+  @Deprecated
   protected static String getTokenClass(JobConf job) {
-    return InputConfigurator.getTokenClass(CLASS, job);
+    return getAuthenticationToken(job).getClass().getName();
   }
-  
+
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Gets the serialized token from either the configuration or the token file.
    * 
-   * @param job
-   *          the Hadoop context for the configured job
-   * @return the decoded user password
    * @since 1.5.0
-   * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+   * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead.
    */
+  @Deprecated
   protected static byte[] getToken(JobConf job) {
-    return InputConfigurator.getToken(CLASS, job);
+    return AuthenticationTokenSerializer.serialize(getAuthenticationToken(job));
   }
-  
+
   /**
-   * Gets the password file from the configuration. It is BASE64 encoded to provide a charset safe conversion to a string, and is not intended to be secure. If
-   * specified, the password will be stored in a file rather than in the Configuration.
+   * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured.
    * 
    * @param job
    *          the Hadoop context for the configured job
-   * @return path to the password file as a String
+   * @return the principal's authentication token
    * @since 1.6.0
    * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+   * @see #setConnectorInfo(JobConf, String, String)
    */
-  protected static String getTokenFile(JobConf job) {
-    return InputConfigurator.getTokenFile(CLASS, job);
+  protected static AuthenticationToken getAuthenticationToken(JobConf job) {
+    return InputConfigurator.getAuthenticationToken(CLASS, job);
   }
-  
+
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
    * 
@@ -203,7 +198,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
     InputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers);
   }
-  
+
   /**
    * Configures a {@link MockInstance} for this job.
    * 
@@ -216,7 +211,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setMockInstance(JobConf job, String instanceName) {
     InputConfigurator.setMockInstance(CLASS, job, instanceName);
   }
-  
+
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
@@ -230,7 +225,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Instance getInstance(JobConf job) {
     return InputConfigurator.getInstance(CLASS, job);
   }
-  
+
   /**
    * Sets the log level for this job.
    * 
@@ -243,7 +238,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setLogLevel(JobConf job, Level level) {
     InputConfigurator.setLogLevel(CLASS, job, level);
   }
-  
+
   /**
    * Gets the log level from this configuration.
    * 
@@ -256,7 +251,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Level getLogLevel(JobConf job) {
     return InputConfigurator.getLogLevel(CLASS, job);
   }
-  
+
   /**
    * Sets the name of the input table, over which this job will scan.
    * 
@@ -269,7 +264,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setInputTableName(JobConf job, String tableName) {
     InputConfigurator.setInputTableName(CLASS, job, tableName);
   }
-  
+
   /**
    * Gets the table name from the configuration.
    * 
@@ -282,7 +277,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getInputTableName(JobConf job) {
     return InputConfigurator.getInputTableName(CLASS, job);
   }
-  
+
   /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
@@ -295,7 +290,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setScanAuthorizations(JobConf job, Authorizations auths) {
     InputConfigurator.setScanAuthorizations(CLASS, job, auths);
   }
-  
+
   /**
    * Gets the authorizations to set for the scans from the configuration.
    * 
@@ -308,7 +303,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Authorizations getScanAuthorizations(JobConf job) {
     return InputConfigurator.getScanAuthorizations(CLASS, job);
   }
-  
+
   /**
    * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
    * 
@@ -321,7 +316,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setRanges(JobConf job, Collection<Range> ranges) {
     InputConfigurator.setRanges(CLASS, job, ranges);
   }
-  
+
   /**
    * Gets the ranges to scan over from a job.
    * 
@@ -336,7 +331,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static List<Range> getRanges(JobConf job) throws IOException {
     return InputConfigurator.getRanges(CLASS, job);
   }
-  
+
   /**
    * Restricts the columns that will be mapped over for this job.
    * 
@@ -350,7 +345,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void fetchColumns(JobConf job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job, columnFamilyColumnQualifierPairs);
   }
-  
+
   /**
    * Gets the columns to be mapped over from this job.
    * 
@@ -363,7 +358,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobConf job) {
     return InputConfigurator.getFetchedColumns(CLASS, job);
   }
-  
+
   /**
    * Encode an iterator on the input for this job.
    * 
@@ -376,7 +371,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void addIterator(JobConf job, IteratorSetting cfg) {
     InputConfigurator.addIterator(CLASS, job, cfg);
   }
-  
+
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
    * 
@@ -389,7 +384,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static List<IteratorSetting> getIterators(JobConf job) {
     return InputConfigurator.getIterators(CLASS, job);
   }
-  
+
   /**
    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
@@ -407,7 +402,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setAutoAdjustRanges(JobConf job, boolean enableFeature) {
     InputConfigurator.setAutoAdjustRanges(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has auto-adjust ranges enabled.
    * 
@@ -420,7 +415,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static boolean getAutoAdjustRanges(JobConf job) {
     return InputConfigurator.getAutoAdjustRanges(CLASS, job);
   }
-  
+
   /**
    * Controls the use of the {@link IsolatedScanner} in this job.
    * 
@@ -436,7 +431,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setScanIsolation(JobConf job, boolean enableFeature) {
     InputConfigurator.setScanIsolation(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has isolation enabled.
    * 
@@ -449,7 +444,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static boolean isIsolated(JobConf job) {
     return InputConfigurator.isIsolated(CLASS, job);
   }
-  
+
   /**
    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
@@ -466,7 +461,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setLocalIterators(JobConf job, boolean enableFeature) {
     InputConfigurator.setLocalIterators(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration uses local iterators.
    * 
@@ -479,7 +474,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static boolean usesLocalIterators(JobConf job) {
     return InputConfigurator.usesLocalIterators(CLASS, job);
   }
-  
+
   /**
    * <p>
    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
@@ -514,7 +509,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setOfflineTableScan(JobConf job, boolean enableFeature) {
     InputConfigurator.setOfflineTableScan(CLASS, job, enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has the offline table scan feature enabled.
    * 
@@ -527,7 +522,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static boolean isOfflineScan(JobConf job) {
     return InputConfigurator.isOfflineScan(CLASS, job);
   }
-  
+
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
    * 
@@ -541,7 +536,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static TabletLocator getTabletLocator(JobConf job) throws TableNotFoundException {
     return InputConfigurator.getTabletLocator(CLASS, job);
   }
-  
+
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
@@ -555,7 +550,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static void validateOptions(JobConf job) throws IOException {
     InputConfigurator.validateOptions(CLASS, job);
   }
-  
+
   /**
    * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
    * types.
@@ -570,7 +565,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
-    
+
     /**
      * Apply the configured iterators from the configuration to the scanner.
      * 
@@ -585,7 +580,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
         scanner.addScanIterator(iterator);
       }
     }
-    
+
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
@@ -595,18 +590,16 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       log.debug("Initializing input split: " + split.getRange());
       Instance instance = getInstance(job);
       String user = getPrincipal(job);
-      String tokenClass = getTokenClass(job);
-      byte[] password = getToken(job);
+      AuthenticationToken token = getAuthenticationToken(job);
       Authorizations authorizations = getScanAuthorizations(job);
-      
+
       try {
         log.debug("Creating connector with user: " + user);
-        Connector conn = instance.getConnector(user, AuthenticationTokenSerializer.deserialize(tokenClass, password));
+        Connector conn = instance.getConnector(user, token);
         log.debug("Creating scanner for table: " + getInputTableName(job));
         log.debug("Authorizations are: " + authorizations);
         if (isOfflineScan(job)) {
-          scanner = new OfflineScanner(instance, new Credentials(user, AuthenticationTokenSerializer.deserialize(tokenClass, password)), Tables.getTableId(
-              instance, getInputTableName(job)), authorizations);
+          scanner = new OfflineScanner(instance, new Credentials(user, token), Tables.getTableId(instance, getInputTableName(job)), authorizations);
         } else {
           scanner = conn.createScanner(getInputTableName(job), authorizations);
         }
@@ -622,7 +615,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       } catch (Exception e) {
         throw new IOException(e);
       }
-      
+
       // setup a scanner within the bounds of this split
       for (Pair<Text,Text> c : getFetchedColumns(job)) {
         if (c.getSecond() != null) {
@@ -633,58 +626,58 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
           scanner.fetchColumnFamily(c.getFirst());
         }
       }
-      
+
       scanner.setRange(split.getRange());
-      
+
       numKeysRead = 0;
-      
+
       // do this last after setting all scanner options
       scannerIterator = scanner.iterator();
     }
-    
+
     @Override
     public void close() {}
-    
+
     @Override
     public long getPos() throws IOException {
       return numKeysRead;
     }
-    
+
     @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
       return split.getProgress(currentKey);
     }
-    
+
     protected Key currentKey = null;
-    
+
   }
-  
+
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException,
       AccumuloSecurityException {
-    
+
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    
+
     Instance instance = getInstance(job);
-    Connector conn = instance.getConnector(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job)));
+    Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job));
     String tableId = Tables.getTableId(instance, tableName);
-    
+
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
       Tables.clearCache(instance);
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
       }
     }
-    
+
     for (Range range : ranges) {
       Text startRow;
-      
+
       if (range.getStartKey() != null)
         startRow = range.getStartKey().getRow();
       else
         startRow = new Text();
-      
+
       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
       Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
@@ -692,73 +685,73 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
       scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
       scanner.setRange(metadataRange);
-      
+
       RowIterator rowIter = new RowIterator(scanner);
-      
+
       KeyExtent lastExtent = null;
-      
+
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
         KeyExtent extent = null;
         String location = null;
-        
+
         while (row.hasNext()) {
           Entry<Key,Value> entry = row.next();
           Key key = entry.getKey();
-          
+
           if (key.getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
             last = entry.getValue().toString();
           }
-          
+
           if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
               || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
             location = entry.getValue().toString();
           }
-          
+
           if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
-          
+
         }
-        
+
         if (location != null)
           return null;
-        
+
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-        
+
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-        
+
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
           binnedRanges.put(last, tabletRanges);
         }
-        
+
         List<Range> rangeList = tabletRanges.get(extent);
         if (rangeList == null) {
           rangeList = new ArrayList<Range>();
           tabletRanges.put(extent, rangeList);
         }
-        
+
         rangeList.add(range);
-        
+
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
           break;
         }
-        
+
         lastExtent = extent;
       }
-      
+
     }
-    
+
     return binnedRanges;
   }
-  
+
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
@@ -766,16 +759,16 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     log.setLevel(getLogLevel(job));
     validateOptions(job);
-    
+
     String tableName = getInputTableName(job);
     boolean autoAdjust = getAutoAdjustRanges(job);
     List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job);
-    
+
     if (ranges.isEmpty()) {
       ranges = new ArrayList<Range>(1);
       ranges.add(new Range());
     }
-    
+
     // get the metadata information for these ranges
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
@@ -793,8 +786,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
         tl = getTabletLocator(job);
         // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
         tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job))), ranges,
-            binnedRanges).isEmpty()) {
+        while (!tl.binRanges(new Credentials(getPrincipal(job), getAuthenticationToken(job)), ranges, binnedRanges).isEmpty()) {
           if (!(instance instanceof MockInstance)) {
             if (tableId == null)
               tableId = Tables.getTableId(instance, tableName);
@@ -812,15 +804,15 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     } catch (Exception e) {
       throw new IOException(e);
     }
-    
+
     ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
     HashMap<Range,ArrayList<String>> splitsToAdd = null;
-    
+
     if (!autoAdjust)
       splitsToAdd = new HashMap<Range,ArrayList<String>>();
-    
+
     HashMap<String,String> hostNameCache = new HashMap<String,String>();
-    
+
     for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
       String ip = tserverBin.getKey().split(":", 2)[0];
       String location = hostNameCache.get(ip);
@@ -829,7 +821,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
         location = inetAddress.getHostName();
         hostNameCache.put(ip, location);
       }
-      
+
       for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
         Range ke = extentRanges.getKey().toDataRange();
         for (Range r : extentRanges.getValue()) {
@@ -847,30 +839,30 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
         }
       }
     }
-    
+
     if (!autoAdjust)
       for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
         splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
     return splits.toArray(new InputSplit[splits.size()]);
   }
-  
+
   /**
    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
    */
   public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit implements InputSplit {
-    
+
     public RangeInputSplit() {
       super();
     }
-    
+
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       super(split);
     }
-    
+
     protected RangeInputSplit(String table, Range range, String[] locations) {
       super(table, range, locations);
     }
-    
+
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0fcd6d1/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index cefb634..727bfec 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -61,16 +61,17 @@ import org.apache.log4j.Logger;
  * 
  * <ul>
  * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
+ * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, String)}
  * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
  * Other static methods are optional.
  */
 public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
-  
+
   private static final Class<?> CLASS = AccumuloOutputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -90,7 +91,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
     OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
   }
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -109,7 +110,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException {
     OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
   }
-  
+
   /**
    * Determines if the connector has been configured.
    * 
@@ -122,7 +123,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   protected static Boolean isConnectorInfoSet(JobContext context) {
     return OutputConfigurator.isConnectorInfoSet(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
    * Gets the user name from the configuration.
    * 
@@ -135,34 +136,43 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   protected static String getPrincipal(JobContext context) {
     return OutputConfigurator.getPrincipal(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
-   * Gets the serialized token class name from the configuration.
+   * Gets the serialized token class from either the configuration or the token file.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the user name
    * @since 1.5.0
-   * @see #setConnectorInfo(Job, String, AuthenticationToken)
+   * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead.
    */
+  @Deprecated
   protected static String getTokenClass(JobContext context) {
-    return OutputConfigurator.getTokenClass(CLASS, InputFormatBase.getConfiguration(context));
+    return getAuthenticationToken(context).getClass().getName();
+  }
+
+  /**
+   * Gets the serialized token from either the configuration or the token file.
+   * 
+   * @since 1.5.0
+   * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead.
+   */
+  @Deprecated
+  protected static byte[] getToken(JobContext context) {
+    return AuthenticationTokenSerializer.serialize(getAuthenticationToken(context));
   }
-  
+
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the decoded user password
-   * @since 1.5.0
+   * @return the principal's authentication token
+   * @since 1.6.0
    * @see #setConnectorInfo(Job, String, AuthenticationToken)
+   * @see #setConnectorInfo(Job, String, String)
    */
-  protected static byte[] getToken(JobContext context) {
-    return OutputConfigurator.getToken(CLASS, InputFormatBase.getConfiguration(context));
+  protected static AuthenticationToken getAuthenticationToken(JobContext context) {
+    return OutputConfigurator.getAuthenticationToken(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
    * 
@@ -177,7 +187,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
     OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
   }
-  
+
   /**
    * Configures a {@link MockInstance} for this job.
    * 
@@ -190,7 +200,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setMockInstance(Job job, String instanceName) {
     OutputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
   }
-  
+
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
@@ -204,7 +214,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   protected static Instance getInstance(JobContext context) {
     return OutputConfigurator.getInstance(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
    * Sets the log level for this job.
    * 
@@ -217,7 +227,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setLogLevel(Job job, Level level) {
     OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
-  
+
   /**
    * Gets the log level from this configuration.
    * 
@@ -230,7 +240,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   protected static Level getLogLevel(JobContext context) {
     return OutputConfigurator.getLogLevel(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
    * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and
    * underscores.
@@ -244,7 +254,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setDefaultTableName(Job job, String tableName) {
     OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName);
   }
-  
+
   /**
    * Gets the default table name from the configuration.
    * 
@@ -257,7 +267,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   protected static String getDefaultTableName(JobContext context) {
     return OutputConfigurator.getDefaultTableName(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
    * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
    * used. Setting the configuration multiple times overwrites any previous configuration.
@@ -271,7 +281,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) {
     OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig);
   }
-  
+
   /**
    * Gets the {@link BatchWriterConfig} settings.
    * 
@@ -284,7 +294,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   protected static BatchWriterConfig getBatchWriterOptions(JobContext context) {
     return OutputConfigurator.getBatchWriterOptions(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
    * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores.
    * 
@@ -300,7 +310,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setCreateTables(Job job, boolean enableFeature) {
     OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature);
   }
-  
+
   /**
    * Determines whether tables are permitted to be created as needed.
    * 
@@ -313,7 +323,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   protected static Boolean canCreateTables(JobContext context) {
     return OutputConfigurator.canCreateTables(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
    * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing.
    * 
@@ -329,7 +339,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   public static void setSimulationMode(Job job, boolean enableFeature) {
     OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature);
   }
-  
+
   /**
    * Determines whether this feature is enabled.
    * 
@@ -342,7 +352,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   protected static Boolean getSimulationMode(JobContext context) {
     return OutputConfigurator.getSimulationMode(CLASS, InputFormatBase.getConfiguration(context));
   }
-  
+
   /**
    * A base class to be used to create {@link RecordWriter} instances that write to Accumulo.
    */
@@ -350,37 +360,36 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
     private MultiTableBatchWriter mtbw = null;
     private HashMap<Text,BatchWriter> bws = null;
     private Text defaultTableName = null;
-    
+
     private boolean simulate = false;
     private boolean createTables = false;
-    
+
     private long mutCount = 0;
     private long valCount = 0;
-    
+
     private Connector conn;
-    
+
     protected AccumuloRecordWriter(TaskAttemptContext context) throws AccumuloException, AccumuloSecurityException, IOException {
       Level l = getLogLevel(context);
       if (l != null)
         log.setLevel(getLogLevel(context));
       this.simulate = getSimulationMode(context);
       this.createTables = canCreateTables(context);
-      
+
       if (simulate)
         log.info("Simulating output only. No writes to tables will occur");
-      
+
       this.bws = new HashMap<Text,BatchWriter>();
-      
+
       String tname = getDefaultTableName(context);
       this.defaultTableName = (tname == null) ? null : new Text(tname);
-      
+
       if (!simulate) {
-        this.conn = getInstance(context).getConnector(getPrincipal(context),
-            AuthenticationTokenSerializer.deserialize(getTokenClass(context), getToken(context)));
+        this.conn = getInstance(context).getConnector(getPrincipal(context), getAuthenticationToken(context));
         mtbw = conn.createMultiTableBatchWriter(getBatchWriterOptions(context));
       }
     }
-    
+
     /**
      * Push a mutation into a table. If table is null, the defaultTable will be used. If canCreateTable is set, the table will be created if it does not exist.
      * The table name must only contain alphanumerics and underscore.
@@ -389,17 +398,17 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
     public void write(Text table, Mutation mutation) throws IOException {
       if (table == null || table.toString().isEmpty())
         table = this.defaultTableName;
-      
+
       if (!simulate && table == null)
         throw new IOException("No table or default table specified. Try simulation mode next time");
-      
+
       ++mutCount;
       valCount += mutation.size();
       printMutation(table, mutation);
-      
+
       if (simulate)
         return;
-      
+
       if (!bws.containsKey(table))
         try {
           addTable(table);
@@ -407,24 +416,24 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
           e.printStackTrace();
           throw new IOException(e);
         }
-      
+
       try {
         bws.get(table).addMutation(mutation);
       } catch (MutationsRejectedException e) {
         throw new IOException(e);
       }
     }
-    
+
     public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException {
       if (simulate) {
         log.info("Simulating adding table: " + tableName);
         return;
       }
-      
+
       log.debug("Adding table: " + tableName);
       BatchWriter bw = null;
       String table = tableName.toString();
-      
+
       if (createTables && !conn.tableOperations().exists(table)) {
         try {
           conn.tableOperations().create(table);
@@ -435,7 +444,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
           // Shouldn't happen
         }
       }
-      
+
       try {
         bw = mtbw.getBatchWriter(table);
       } catch (TableNotFoundException e) {
@@ -446,11 +455,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
       } catch (AccumuloSecurityException e) {
         throw e;
       }
-      
+
       if (bw != null)
         bws.put(tableName, bw);
     }
-    
+
     private int printMutation(Text table, Mutation m) {
       if (log.isTraceEnabled()) {
         log.trace(String.format("Table %s row key: %s", table, hexDump(m.getRow())));
@@ -462,7 +471,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
       }
       return m.getUpdates().size();
     }
-    
+
     private String hexDump(byte[] ba) {
       StringBuilder sb = new StringBuilder();
       for (byte b : ba) {
@@ -473,13 +482,13 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
       }
       return sb.toString();
     }
-    
+
     @Override
     public void close(TaskAttemptContext attempt) throws IOException, InterruptedException {
       log.debug("mutations written: " + mutCount + ", values written: " + valCount);
       if (simulate)
         return;
-      
+
       try {
         mtbw.close();
       } catch (MutationsRejectedException e) {
@@ -493,25 +502,27 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
             }
             secCodes.addAll(ke.getValue());
           }
-          
+
           log.error("Not authorized to write to tables : " + tables);
         }
-        
+
         if (e.getConstraintViolationSummaries().size() > 0) {
           log.error("Constraint violations : " + e.getConstraintViolationSummaries().size());
         }
       }
     }
   }
-  
+
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
     if (!isConnectorInfoSet(job))
       throw new IOException("Connector info has not been set.");
     try {
       // if the instance isn't configured, it will complain here
-      Connector c = getInstance(job).getConnector(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job)));
-      if (!c.securityOperations().authenticateUser(getPrincipal(job), AuthenticationTokenSerializer.deserialize(getTokenClass(job), getToken(job))))
+      String principal = getPrincipal(job);
+      AuthenticationToken token = getAuthenticationToken(job);
+      Connector c = getInstance(job).getConnector(principal, token);
+      if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
     } catch (AccumuloException e) {
       throw new IOException(e);
@@ -519,12 +530,12 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
       throw new IOException(e);
     }
   }
-  
+
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
     return new NullOutputFormat<Text,Mutation>().getOutputCommitter(context);
   }
-  
+
   @Override
   public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException {
     try {
@@ -533,5 +544,5 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
       throw new IOException(e);
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0fcd6d1/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index f70f6e2..13f9708 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -87,10 +87,10 @@ import org.apache.log4j.Logger;
  * See {@link AccumuloInputFormat} for an example implementation.
  */
 public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
-  
+
   private static final Class<?> CLASS = AccumuloInputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -110,7 +110,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
   }
-  
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -129,7 +129,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
   }
-  
+
   /**
    * Determines if the connector has been configured.
    * 
@@ -142,7 +142,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static Boolean isConnectorInfoSet(JobContext context) {
     return InputConfigurator.isConnectorInfoSet(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Gets the user name from the configuration.
    * 
@@ -155,47 +155,43 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static String getPrincipal(JobContext context) {
     return InputConfigurator.getPrincipal(CLASS, getConfiguration(context));
   }
-  
+
   /**
-   * Gets the serialized token class from the configuration.
+   * Gets the serialized token class from either the configuration or the token file.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the user name
    * @since 1.5.0
-   * @see #setConnectorInfo(Job, String, AuthenticationToken)
+   * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead.
    */
+  @Deprecated
   protected static String getTokenClass(JobContext context) {
-    return InputConfigurator.getTokenClass(CLASS, getConfiguration(context));
+    return getAuthenticationToken(context).getClass().getName();
   }
-  
+
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Gets the serialized token from either the configuration or the token file.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the decoded user password
    * @since 1.5.0
-   * @see #setConnectorInfo(Job, String, AuthenticationToken)
+   * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead.
    */
+  @Deprecated
   protected static byte[] getToken(JobContext context) {
-    return InputConfigurator.getToken(CLASS, getConfiguration(context));
+    return AuthenticationTokenSerializer.serialize(getAuthenticationToken(context));
   }
-  
+
   /**
-   * Gets the password file from the configuration.
+   * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return path to the password file as a String
+   * @return the principal's authentication token
    * @since 1.6.0
    * @see #setConnectorInfo(Job, String, AuthenticationToken)
+   * @see #setConnectorInfo(Job, String, String)
    */
-  protected static String getTokenFile(JobContext context) {
-    return InputConfigurator.getTokenFile(CLASS, getConfiguration(context));
+  protected static AuthenticationToken getAuthenticationToken(JobContext context) {
+    return InputConfigurator.getAuthenticationToken(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
    * 
@@ -210,7 +206,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
     InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
   }
-  
+
   /**
    * Configures a {@link MockInstance} for this job.
    * 
@@ -223,7 +219,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setMockInstance(Job job, String instanceName) {
     InputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
   }
-  
+
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
@@ -237,7 +233,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static Instance getInstance(JobContext context) {
     return InputConfigurator.getInstance(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Sets the log level for this job.
    * 
@@ -250,7 +246,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setLogLevel(Job job, Level level) {
     InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
-  
+
   /**
    * Gets the log level from this configuration.
    * 
@@ -263,7 +259,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static Level getLogLevel(JobContext context) {
     return InputConfigurator.getLogLevel(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Sets the name of the input table, over which this job will scan.
    * 
@@ -276,7 +272,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setInputTableName(Job job, String tableName) {
     InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
   }
-  
+
   /**
    * Gets the table name from the configuration.
    * 
@@ -289,7 +285,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static String getInputTableName(JobContext context) {
     return InputConfigurator.getInputTableName(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
@@ -302,7 +298,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setScanAuthorizations(Job job, Authorizations auths) {
     InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
   }
-  
+
   /**
    * Gets the authorizations to set for the scans from the configuration.
    * 
@@ -315,7 +311,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static Authorizations getScanAuthorizations(JobContext context) {
     return InputConfigurator.getScanAuthorizations(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
    * 
@@ -328,7 +324,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setRanges(Job job, Collection<Range> ranges) {
     InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
   }
-  
+
   /**
    * Gets the ranges to scan over from a job.
    * 
@@ -343,7 +339,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static List<Range> getRanges(JobContext context) throws IOException {
     return InputConfigurator.getRanges(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Restricts the columns that will be mapped over for this job.
    * 
@@ -357,7 +353,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
   }
-  
+
   /**
    * Gets the columns to be mapped over from this job.
    * 
@@ -370,7 +366,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
     return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Encode an iterator on the input for this job.
    * 
@@ -383,7 +379,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void addIterator(Job job, IteratorSetting cfg) {
     InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
   }
-  
+
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
    * 
@@ -396,7 +392,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static List<IteratorSetting> getIterators(JobContext context) {
     return InputConfigurator.getIterators(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
@@ -414,7 +410,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
     InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has auto-adjust ranges enabled.
    * 
@@ -427,7 +423,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static boolean getAutoAdjustRanges(JobContext context) {
     return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Controls the use of the {@link IsolatedScanner} in this job.
    * 
@@ -443,7 +439,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setScanIsolation(Job job, boolean enableFeature) {
     InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has isolation enabled.
    * 
@@ -456,7 +452,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static boolean isIsolated(JobContext context) {
     return InputConfigurator.isIsolated(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
@@ -473,7 +469,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setLocalIterators(Job job, boolean enableFeature) {
     InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration uses local iterators.
    * 
@@ -486,7 +482,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static boolean usesLocalIterators(JobContext context) {
     return InputConfigurator.usesLocalIterators(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * <p>
    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
@@ -521,7 +517,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static void setOfflineTableScan(Job job, boolean enableFeature) {
     InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
   }
-  
+
   /**
    * Determines whether a configuration has the offline table scan feature enabled.
    * 
@@ -534,7 +530,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static boolean isOfflineScan(JobContext context) {
     return InputConfigurator.isOfflineScan(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
    * 
@@ -548,7 +544,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
     return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context));
   }
-  
+
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
@@ -562,7 +558,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   protected static void validateOptions(JobContext context) throws IOException {
     InputConfigurator.validateOptions(CLASS, getConfiguration(context));
   }
-  
+
   /**
    * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
    * types.
@@ -579,7 +575,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
-    
+
     /**
      * Apply the configured iterators from the configuration to the scanner.
      * 
@@ -594,7 +590,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         scanner.addScanIterator(iterator);
       }
     }
-    
+
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
@@ -605,18 +601,16 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       log.debug("Initializing input split: " + split.range);
       Instance instance = getInstance(attempt);
       String principal = getPrincipal(attempt);
-      String tokenClass = getTokenClass(attempt);
-      byte[] token = getToken(attempt);
+      AuthenticationToken token = getAuthenticationToken(attempt);
       Authorizations authorizations = getScanAuthorizations(attempt);
-      
+
       try {
         log.debug("Creating connector with user: " + principal);
-        Connector conn = instance.getConnector(principal, AuthenticationTokenSerializer.deserialize(tokenClass, token));
+        Connector conn = instance.getConnector(principal, token);
         log.debug("Creating scanner for table: " + getInputTableName(attempt));
         log.debug("Authorizations are: " + authorizations);
         if (isOfflineScan(attempt)) {
-          scanner = new OfflineScanner(instance, new Credentials(principal, AuthenticationTokenSerializer.deserialize(tokenClass, token)), Tables.getTableId(
-              instance, getInputTableName(attempt)), authorizations);
+          scanner = new OfflineScanner(instance, new Credentials(principal, token), Tables.getTableId(instance, getInputTableName(attempt)), authorizations);
         } else {
           scanner = conn.createScanner(getInputTableName(attempt), authorizations);
         }
@@ -632,7 +626,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       } catch (Exception e) {
         throw new IOException(e);
       }
-      
+
       // setup a scanner within the bounds of this split
       for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
         if (c.getSecond() != null) {
@@ -643,65 +637,65 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
           scanner.fetchColumnFamily(c.getFirst());
         }
       }
-      
+
       scanner.setRange(split.range);
-      
+
       numKeysRead = 0;
-      
+
       // do this last after setting all scanner options
       scannerIterator = scanner.iterator();
     }
-    
+
     @Override
     public void close() {}
-    
+
     @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
       return split.getProgress(currentKey);
     }
-    
+
     protected K currentK = null;
     protected V currentV = null;
     protected Key currentKey = null;
     protected Value currentValue = null;
-    
+
     @Override
     public K getCurrentKey() throws IOException, InterruptedException {
       return currentK;
     }
-    
+
     @Override
     public V getCurrentValue() throws IOException, InterruptedException {
       return currentV;
     }
   }
-  
+
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
       AccumuloException, AccumuloSecurityException {
-    
+
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    
+
     Instance instance = getInstance(context);
-    Connector conn = instance.getConnector(getPrincipal(context), AuthenticationTokenSerializer.deserialize(getTokenClass(context), getToken(context)));
+    Connector conn = instance.getConnector(getPrincipal(context), getAuthenticationToken(context));
     String tableId = Tables.getTableId(instance, tableName);
-    
+
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
       Tables.clearCache(instance);
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
       }
     }
-    
+
     for (Range range : ranges) {
       Text startRow;
-      
+
       if (range.getStartKey() != null)
         startRow = range.getStartKey().getRow();
       else
         startRow = new Text();
-      
+
       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
       Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
@@ -709,73 +703,73 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
       scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
       scanner.setRange(metadataRange);
-      
+
       RowIterator rowIter = new RowIterator(scanner);
-      
+
       KeyExtent lastExtent = null;
-      
+
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
         KeyExtent extent = null;
         String location = null;
-        
+
         while (row.hasNext()) {
           Entry<Key,Value> entry = row.next();
           Key key = entry.getKey();
-          
+
           if (key.getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
             last = entry.getValue().toString();
           }
-          
+
           if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
               || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
             location = entry.getValue().toString();
           }
-          
+
           if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
-          
+
         }
-        
+
         if (location != null)
           return null;
-        
+
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-        
+
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-        
+
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
           binnedRanges.put(last, tabletRanges);
         }
-        
+
         List<Range> rangeList = tabletRanges.get(extent);
         if (rangeList == null) {
           rangeList = new ArrayList<Range>();
           tabletRanges.put(extent, rangeList);
         }
-        
+
         rangeList.add(range);
-        
+
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
           break;
         }
-        
+
         lastExtent = extent;
       }
-      
+
     }
-    
+
     return binnedRanges;
   }
-  
+
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
@@ -783,16 +777,16 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public List<InputSplit> getSplits(JobContext context) throws IOException {
     log.setLevel(getLogLevel(context));
     validateOptions(context);
-    
+
     String tableName = getInputTableName(context);
     boolean autoAdjust = getAutoAdjustRanges(context);
     List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
-    
+
     if (ranges.isEmpty()) {
       ranges = new ArrayList<Range>(1);
       ranges.add(new Range());
     }
-    
+
     // get the metadata information for these ranges
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
@@ -810,8 +804,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         tl = getTabletLocator(context);
         // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
         tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(context), AuthenticationTokenSerializer.deserialize(getTokenClass(context), getToken(context))),
-            ranges, binnedRanges).isEmpty()) {
+        while (!tl.binRanges(new Credentials(getPrincipal(context), getAuthenticationToken(context)), ranges, binnedRanges).isEmpty()) {
           if (!(instance instanceof MockInstance)) {
             if (tableId == null)
               tableId = Tables.getTableId(instance, tableName);
@@ -829,15 +822,15 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     } catch (Exception e) {
       throw new IOException(e);
     }
-    
+
     ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
     HashMap<Range,ArrayList<String>> splitsToAdd = null;
-    
+
     if (!autoAdjust)
       splitsToAdd = new HashMap<Range,ArrayList<String>>();
-    
+
     HashMap<String,String> hostNameCache = new HashMap<String,String>();
-    
+
     for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
       String ip = tserverBin.getKey().split(":", 2)[0];
       String location = hostNameCache.get(ip);
@@ -846,7 +839,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         location = inetAddress.getHostName();
         hostNameCache.put(ip, location);
       }
-      
+
       for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
         Range ke = extentRanges.getKey().toDataRange();
         for (Range r : extentRanges.getValue()) {
@@ -864,43 +857,43 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         }
       }
     }
-    
+
     if (!autoAdjust)
       for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
         splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
     return splits;
   }
-  
+
   /**
    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
    */
   public static class RangeInputSplit extends InputSplit implements Writable {
     private Range range;
     private String[] locations;
-    
+
     public RangeInputSplit() {
       range = new Range();
       locations = new String[0];
     }
-    
+
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       this.setRange(split.getRange());
       this.setLocations(split.getLocations());
     }
-    
+
     protected RangeInputSplit(String table, Range range, String[] locations) {
       this.range = range;
       this.locations = locations;
     }
-    
+
     public Range getRange() {
       return range;
     }
-    
+
     public void setRange(Range range) {
       this.range = range;
     }
-    
+
     private static byte[] extractBytes(ByteSequence seq, int numBytes) {
       byte[] bytes = new byte[numBytes + 1];
       bytes[0] = 0;
@@ -912,7 +905,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       }
       return bytes;
     }
-    
+
     public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
       int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
       BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
@@ -920,7 +913,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
       return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
     }
-    
+
     public float getProgress(Key currentKey) {
       if (currentKey == null)
         return 0f;
@@ -939,7 +932,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       // if we can't figure it out, then claim no progress
       return 0f;
     }
-    
+
     /**
      * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
      */
@@ -949,29 +942,29 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
       int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
       long diff = 0;
-      
+
       byte[] start = startRow.getBytes();
       byte[] stop = stopRow.getBytes();
       for (int i = 0; i < maxCommon; ++i) {
         diff |= 0xff & (start[i] ^ stop[i]);
         diff <<= Byte.SIZE;
       }
-      
+
       if (startRow.getLength() != stopRow.getLength())
         diff |= 0xff;
-      
+
       return diff + 1;
     }
-    
+
     @Override
     public String[] getLocations() throws IOException {
       return locations;
     }
-    
+
     public void setLocations(String[] locations) {
       this.locations = locations;
     }
-    
+
     @Override
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
@@ -980,7 +973,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       for (int i = 0; i < numLocs; ++i)
         locations[i] = in.readUTF();
     }
-    
+
     @Override
     public void write(DataOutput out) throws IOException {
       range.write(out);
@@ -989,7 +982,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
         out.writeUTF(locations[i]);
     }
   }
-  
+
   // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility
   static Configuration getConfiguration(JobContext context) {
     try {
@@ -1001,5 +994,5 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       throw new RuntimeException(e);
     }
   }
-  
+
 }


Mime
View raw message