accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject svn commit: r1437073 [1/3] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/cli/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ core/src/test/java/org/ap...
Date Tue, 22 Jan 2013 18:07:17 GMT
Author: ctubbsii
Date: Tue Jan 22 18:07:17 2013
New Revision: 1437073

URL: http://svn.apache.org/viewvc?rev=1437073&view=rev
Log:
ACCUMULO-769 Modify mapreduce API to use the Hadoop static configurator conventions, but done in a way that allows us to standardize and reuse configurator code to support multiple frameworks.

Added:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/FileOutputConfigurator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/OutputConfigurator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/package-info.java
Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
    accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java Tue Jan 22 18:07:17 2013
@@ -41,8 +41,12 @@ public class ClientOnDefaultTable extend
   @Override
   public void setAccumuloConfigs(Job job) {
     super.setAccumuloConfigs(job);
-    AccumuloInputFormat.setInputInfo(job, user, getPassword(), getTableName(), auths);
-    AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true, getTableName());
+    AccumuloInputFormat.setConnectorInfo(job, user, getPassword());
+    AccumuloInputFormat.setInputTableName(job, getTableName());
+    AccumuloInputFormat.setScanAuthorizations(job, auths);
+    AccumuloOutputFormat.setConnectorInfo(job, user, getPassword());
+    AccumuloOutputFormat.setCreateTables(job, true);
+    AccumuloOutputFormat.setDefaultTableName(job, getTableName());
   }
   
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java Tue Jan 22 18:07:17 2013
@@ -30,7 +30,11 @@ public class ClientOnRequiredTable exten
   @Override
   public void setAccumuloConfigs(Job job) {
     super.setAccumuloConfigs(job);
-    AccumuloInputFormat.setInputInfo(job, user, getPassword(), tableName, auths);
-    AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true, tableName);
+    AccumuloInputFormat.setConnectorInfo(job, user, getPassword());
+    AccumuloInputFormat.setInputTableName(job, tableName);
+    AccumuloInputFormat.setScanAuthorizations(job, auths);
+    AccumuloOutputFormat.setConnectorInfo(job, user, getPassword());
+    AccumuloOutputFormat.setCreateTables(job, true);
+    AccumuloOutputFormat.setDefaultTableName(job, tableName);
   }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java Tue Jan 22 18:07:17 2013
@@ -18,20 +18,18 @@ package org.apache.accumulo.core.client.
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.util.FileOutputConfigurator;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -41,6 +39,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.log4j.Logger;
 
 /**
  * This class allows MapReduce jobs to write output in the Accumulo data file format.<br />
@@ -53,70 +52,33 @@ import org.apache.hadoop.mapreduce.lib.o
  * supported at this time.
  */
 public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
-  private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName() + ".";
-  private static final String ACCUMULO_PROPERTY_PREFIX = PREFIX + "accumuloProperties.";
+  
+  private static final Class<?> CLASS = AccumuloFileOutputFormat.class;
+  protected static final Logger log = Logger.getLogger(CLASS);
   
   /**
    * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been
    * stored in the Job's configuration.
    * 
+   * @param context
+   *          the Hadoop context for the configured job
    * @since 1.5.0
-   * @see #setAccumuloProperty(Job, Property, Object)
    */
   protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) {
-    ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
-    for (Entry<String,String> entry : context.getConfiguration())
-      if (entry.getKey().startsWith(ACCUMULO_PROPERTY_PREFIX))
-        acuConf.set(Property.getPropertyByKey(entry.getKey().substring(ACCUMULO_PROPERTY_PREFIX.length())), entry.getValue());
-    return acuConf;
-  }
-  
-  /**
-   * The supported Accumulo properties we set in this OutputFormat, that change the behavior of the RecordWriter.<br />
-   * These properties correspond to the supported public static setter methods available to this class.
-   * 
-   * @since 1.5.0
-   */
-  protected static boolean isSupportedAccumuloProperty(Property property) {
-    switch (property) {
-      case TABLE_FILE_COMPRESSION_TYPE:
-      case TABLE_FILE_COMPRESSED_BLOCK_SIZE:
-      case TABLE_FILE_BLOCK_SIZE:
-      case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX:
-      case TABLE_FILE_REPLICATION:
-        return true;
-      default:
-        return false;
-    }
-  }
-  
-  /**
-   * Helper for transforming Accumulo configuration properties into something that can be stored safely inside the Hadoop Job configuration.
-   * 
-   * @since 1.5.0
-   */
-  protected static <T> void setAccumuloProperty(Job job, Property property, T value) {
-    if (isSupportedAccumuloProperty(property)) {
-      String val = String.valueOf(value);
-      if (property.getType().isValidFormat(val))
-        job.getConfiguration().set(ACCUMULO_PROPERTY_PREFIX + property.getKey(), val);
-      else
-        throw new IllegalArgumentException("Value is not appropriate for property type '" + property.getType() + "'");
-    } else
-      throw new IllegalArgumentException("Unsupported configuration property " + property.getKey());
+    return FileOutputConfigurator.getAccumuloConfiguration(CLASS, context.getConfiguration());
   }
   
   /**
    * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job.
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param compressionType
    *          one of "none", "gz", "lzo", or "snappy"
    * @since 1.5.0
    */
   public static void setCompressionType(Job job, String compressionType) {
-    if (compressionType == null || !Arrays.asList("none", "gz", "lzo", "snappy").contains(compressionType))
-      throw new IllegalArgumentException("Compression type must be one of: none, gz, lzo, snappy");
-    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType);
+    FileOutputConfigurator.setCompressionType(CLASS, job.getConfiguration(), compressionType);
   }
   
   /**
@@ -126,38 +88,54 @@ public class AccumuloFileOutputFormat ex
    * <p>
    * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance).
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param dataBlockSize
+   *          the block size, in bytes
    * @since 1.5.0
    */
   public static void setDataBlockSize(Job job, long dataBlockSize) {
-    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, dataBlockSize);
+    FileOutputConfigurator.setDataBlockSize(CLASS, job.getConfiguration(), dataBlockSize);
   }
   
   /**
    * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system.
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param fileBlockSize
+   *          the block size, in bytes
    * @since 1.5.0
    */
   public static void setFileBlockSize(Job job, long fileBlockSize) {
-    setAccumuloProperty(job, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize);
+    FileOutputConfigurator.setFileBlockSize(CLASS, job.getConfiguration(), fileBlockSize);
   }
   
   /**
    * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow
    * index hierarchy within the file. This can affect the performance of queries.
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param indexBlockSize
+   *          the block size, in bytes
    * @since 1.5.0
    */
   public static void setIndexBlockSize(Job job, long indexBlockSize) {
-    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, indexBlockSize);
+    FileOutputConfigurator.setIndexBlockSize(CLASS, job.getConfiguration(), indexBlockSize);
   }
   
   /**
    * Sets the file system replication factor for the resulting file, overriding the file system default.
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param replication
+   *          the number of replicas for produced files
    * @since 1.5.0
    */
   public static void setReplication(Job job, int replication) {
-    setAccumuloProperty(job, Property.TABLE_FILE_REPLICATION, replication);
+    FileOutputConfigurator.setReplication(CLASS, job.getConfiguration(), replication);
   }
   
   @Override
@@ -204,35 +182,13 @@ public class AccumuloFileOutputFormat ex
   // ----------------------------------------------------------------------------------------------------
   
   /**
-   * @deprecated since 1.5.0;
-   * @see #setZooKeeperInstance(Configuration, String, String)
-   */
-  @Deprecated
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + "instanceConfigured";
-  
-  /**
-   * @deprecated since 1.5.0;
-   * @see #setZooKeeperInstance(Configuration, String, String)
-   * @see #getInstance(Configuration)
-   */
-  @Deprecated
-  private static final String INSTANCE_NAME = PREFIX + "instanceName";
-  
-  /**
-   * @deprecated since 1.5.0;
-   * @see #setZooKeeperInstance(Configuration, String, String)
-   * @see #getInstance(Configuration)
-   */
-  @Deprecated
-  private static final String ZOOKEEPERS = PREFIX + "zooKeepers";
-  
-  /**
-   * @deprecated since 1.5.0; Retrieve the relevant block size from {@link #getAccumuloConfiguration(JobContext)}
+   * @deprecated since 1.5.0; Retrieve the relevant block size from {@link #getAccumuloConfiguration(JobContext)} and configure hadoop's
+   *             io.seqfile.compress.blocksize with the same value. No longer needed, as {@link RFile} does not use this field.
    */
   @Deprecated
   protected static void handleBlockSize(Configuration conf) {
     conf.setInt("io.seqfile.compress.blocksize",
-        (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+        (int) FileOutputConfigurator.getAccumuloConfiguration(CLASS, conf).getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
   }
   
   /**
@@ -246,7 +202,7 @@ public class AccumuloFileOutputFormat ex
    */
   @Deprecated
   public static void setBlockSize(Configuration conf, int blockSize) {
-    conf.set(ACCUMULO_PROPERTY_PREFIX + Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), String.valueOf(blockSize));
+    FileOutputConfigurator.setDataBlockSize(CLASS, conf, blockSize);
   }
   
   /**
@@ -255,13 +211,7 @@ public class AccumuloFileOutputFormat ex
    */
   @Deprecated
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
+    FileOutputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
   }
   
   /**
@@ -270,7 +220,7 @@ public class AccumuloFileOutputFormat ex
    */
   @Deprecated
   protected static Instance getInstance(Configuration conf) {
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
+    return FileOutputConfigurator.getInstance(CLASS, conf);
   }
   
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java Tue Jan 22 18:07:17 2013
@@ -21,9 +21,11 @@ import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -34,8 +36,10 @@ import org.apache.hadoop.mapreduce.TaskA
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String, byte[], String, org.apache.accumulo.core.security.Authorizations)}
- * <li>{@link AccumuloInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job, String, String)}
+ * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)}
+ * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
+ * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
  * Other static methods are optional.

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Tue Jan 22 18:07:17 2013
@@ -16,12 +16,7 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map.Entry;
@@ -39,14 +34,13 @@ import org.apache.accumulo.core.client.M
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.util.OutputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
-import org.apache.accumulo.core.util.ArgumentChecker;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -66,146 +60,70 @@ import org.apache.log4j.Logger;
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloOutputFormat#setOutputInfo(Job, String, byte[], boolean, String)}
- * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)}
+ * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
  * Other static methods are optional.
  */
 public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
-  private static final Logger log = Logger.getLogger(AccumuloOutputFormat.class);
   
-  /**
-   * Prefix shared by all job configuration property names for this class
-   */
-  private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName();
+  private static final Class<?> CLASS = AccumuloOutputFormat.class;
+  protected static final Logger log = Logger.getLogger(CLASS);
   
   /**
-   * Used to limit the times a job can be configured with output information to 1
+   * Sets the connector information needed to communicate with Accumulo in this job.
    * 
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
-   * @see #getUsername(JobContext)
-   * @see #getPassword(JobContext)
-   * @see #canCreateTables(JobContext)
-   * @see #getDefaultTableName(JobContext)
-   */
-  private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
-  
-  /**
-   * Used to limit the times a job can be configured with instance information to 1
-   * 
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
-  
-  /**
-   * Key for storing the Accumulo user's name
-   * 
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
-   * @see #getUsername(JobContext)
-   */
-  private static final String USERNAME = PREFIX + ".username";
-  
-  /**
-   * Key for storing the Accumulo user's password
-   * 
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
-   * @see #getPassword(JobContext)
-   */
-  private static final String PASSWORD = PREFIX + ".password";
-  
-  /**
-   * Key for storing the default table to use when the output key is null
-   * 
-   * @see #getDefaultTableName(JobContext)
-   */
-  private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
-  
-  /**
-   * Key for storing the Accumulo instance name to connect to
-   * 
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String INSTANCE_NAME = PREFIX + ".instanceName";
-  
-  /**
-   * Key for storing the set of Accumulo zookeeper servers to communicate with
-   * 
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
-  
-  /**
-   * Key for storing the directive to use the mock instance type
-   * 
-   * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String MOCK = PREFIX + ".useMockInstance";
-  
-  /**
-   * Key for storing the {@link BatchWriterConfig}.
-   * 
-   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
-   * @see #getBatchWriterOptions(JobContext)
-   */
-  private static final String BATCH_WRITER_CONFIG = PREFIX + ".bwConfig";
-  
-  /**
-   * Key for storing the directive to create tables that don't exist
-   * 
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
-   * @see #canCreateTables(JobContext)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param user
+   *          a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true)
+   * @param passwd
+   *          the user's password
+   * @since 1.5.0
    */
-  private static final String CREATETABLES = PREFIX + ".createtables";
+  public static void setConnectorInfo(Job job, String user, byte[] passwd) {
+    OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), user, passwd);
+  }
   
   /**
-   * Key for storing the desired logging level
+   * Determines if the connector has been configured.
    * 
-   * @see #setLogLevel(Job, Level)
-   * @see #getLogLevel(JobContext)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the connector has been configured, false otherwise
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  private static final String LOGLEVEL = PREFIX + ".loglevel";
+  protected static Boolean isConnectorInfoSet(JobContext context) {
+    return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration());
+  }
   
   /**
-   * Key for storing the directive to simulate output instead of actually writing to a file
+   * Gets the user name from the configuration.
    * 
-   * @see #setSimulationMode(Job, boolean)
-   * @see #getSimulationMode(JobContext)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the user name
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  private static final String SIMULATE = PREFIX + ".simulate";
+  protected static String getUsername(JobContext context) {
+    return OutputConfigurator.getUsername(CLASS, context.getConfiguration());
+  }
   
   /**
-   * Sets the minimum information needed to write to Accumulo in this job.
+   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
+   * provide a charset safe conversion to a string, and is not intended to be secure.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param user
-   *          a valid Accumulo user name (user must have Table.CREATE permission)
-   * @param passwd
-   *          the user's password
-   * @param createTables
-   *          if true, the output format will create new tables as necessary. Table names can only be alpha-numeric and underscores.
-   * @param defaultTable
-   *          the table to use when the tablename is null in the write call
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the decoded user password
    * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  public static void setOutputInfo(Job job, String user, byte[] passwd, boolean createTables, String defaultTable) {
-    if (job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Output info can only be set once per job");
-    job.getConfiguration().setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(user, passwd);
-    job.getConfiguration().set(USERNAME, user);
-    job.getConfiguration().set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    job.getConfiguration().setBoolean(CREATETABLES, createTables);
-    if (defaultTable != null)
-      job.getConfiguration().set(DEFAULT_TABLE_NAME, defaultTable);
+  protected static byte[] getPassword(JobContext context) {
+    return OutputConfigurator.getPassword(CLASS, context.getConfiguration());
   }
   
   /**
@@ -220,13 +138,7 @@ public class AccumuloOutputFormat extend
    * @since 1.5.0
    */
   public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
-    if (job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per job");
-    job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    job.getConfiguration().set(INSTANCE_NAME, instanceName);
-    job.getConfiguration().set(ZOOKEEPERS, zooKeepers);
-    System.out.println("instance set: " + job.getConfiguration().get(INSTANCE_HAS_BEEN_SET));
+    OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
   }
   
   /**
@@ -239,32 +151,21 @@ public class AccumuloOutputFormat extend
    * @since 1.5.0
    */
   public static void setMockInstance(Job job, String instanceName) {
-    job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    job.getConfiguration().setBoolean(MOCK, true);
-    job.getConfiguration().set(INSTANCE_NAME, instanceName);
+    OutputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
   }
   
   /**
-   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
-   * used. Setting the configuration multiple times overwrites any previous configuration.
+   * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param bwConfig
-   *          the configuration for the {@link BatchWriter}
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return an Accumulo instance
    * @since 1.5.0
+   * @see #setZooKeeperInstance(Job, String, String)
+   * @see #setMockInstance(Job, String)
    */
-  public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    String serialized;
-    try {
-      bwConfig.write(new DataOutputStream(baos));
-      serialized = new String(baos.toByteArray(), Charset.forName("UTF-8"));
-      baos.close();
-    } catch (IOException e) {
-      throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName());
-    }
-    job.getConfiguration().set(BATCH_WRITER_CONFIG, serialized);
+  protected static Instance getInstance(JobContext context) {
+    return OutputConfigurator.getInstance(CLASS, context.getConfiguration());
   }
   
   /**
@@ -277,134 +178,119 @@ public class AccumuloOutputFormat extend
    * @since 1.5.0
    */
   public static void setLogLevel(Job job, Level level) {
-    ArgumentChecker.notNull(level);
-    job.getConfiguration().setInt(LOGLEVEL, level.toInt());
+    OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
   
   /**
-   * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
+   * Gets the log level from this configuration.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the log level
    * @since 1.5.0
+   * @see #setLogLevel(Job, Level)
    */
-  public static void setSimulationMode(Job job, boolean enableFeature) {
-    job.getConfiguration().setBoolean(SIMULATE, enableFeature);
+  protected static Level getLogLevel(JobContext context) {
+    return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the user name from the configuration.
+   * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and
+   * underscores.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the user name
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableName
+   *          the table to use when the tablename is null in the write call
    * @since 1.5.0
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
    */
-  protected static String getUsername(JobContext context) {
-    return context.getConfiguration().get(USERNAME);
+  public static void setDefaultTableName(Job job, String tableName) {
+    OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName);
   }
   
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Gets the default table name from the configuration.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the decoded user password
+   * @return the default table name
    * @since 1.5.0
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
+   * @see #setDefaultTableName(Job, String)
    */
-  protected static byte[] getPassword(JobContext context) {
-    return Base64.decodeBase64(context.getConfiguration().get(PASSWORD, "").getBytes());
+  protected static String getDefaultTableName(JobContext context) {
+    return OutputConfigurator.getDefaultTableName(CLASS, context.getConfiguration());
   }
   
   /**
-   * Determines whether tables are permitted to be created as needed.
+   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
+   * used. Setting the configuration multiple times overwrites any previous configuration.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return true if the feature is disabled, false otherwise
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param bwConfig
+   *          the configuration for the {@link BatchWriter}
    * @since 1.5.0
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
    */
-  protected static boolean canCreateTables(JobContext context) {
-    return context.getConfiguration().getBoolean(CREATETABLES, false);
+  public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) {
+    OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig);
   }
   
   /**
-   * Gets the default table name from the configuration.
+   * Gets the {@link BatchWriterConfig} settings.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the default table name
+   * @return the configuration object
    * @since 1.5.0
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
+   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
    */
-  protected static String getDefaultTableName(JobContext context) {
-    return context.getConfiguration().get(DEFAULT_TABLE_NAME);
+  protected static BatchWriterConfig getBatchWriterOptions(JobContext context) {
+    return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration());
   }
   
   /**
-   * Initializes an Accumulo {@link Instance} based on the configuration.
+   * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return an Accumulo instance
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
    */
-  protected static Instance getInstance(JobContext context) {
-    if (context.getConfiguration().getBoolean(MOCK, false))
-      return new MockInstance(context.getConfiguration().get(INSTANCE_NAME));
-    return new ZooKeeperInstance(context.getConfiguration().get(INSTANCE_NAME), context.getConfiguration().get(ZOOKEEPERS));
+  public static void setCreateTables(Job job, boolean enableFeature) {
+    OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
-   * Gets the {@link BatchWriterConfig} settings.
+   * Determines whether tables are permitted to be created as needed.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the configuration object
+   * @return true if the feature is disabled, false otherwise
    * @since 1.5.0
-   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
+   * @see #setCreateTables(Job, boolean)
    */
-  protected static BatchWriterConfig getBatchWriterOptions(JobContext context) {
-    String serialized = context.getConfiguration().get(BATCH_WRITER_CONFIG);
-    BatchWriterConfig bwConfig = new BatchWriterConfig();
-    if (serialized == null || serialized.isEmpty()) {
-      return bwConfig;
-    } else {
-      try {
-        ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(Charset.forName("UTF-8")));
-        bwConfig.readFields(new DataInputStream(bais));
-        bais.close();
-        return bwConfig;
-      } catch (IOException e) {
-        throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName());
-      }
-    }
+  protected static Boolean canCreateTables(JobContext context) {
+    return OutputConfigurator.canCreateTables(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the log level from this configuration.
+   * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the log level
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
-   * @see #setLogLevel(Job, Level)
    */
-  protected static Level getLogLevel(JobContext context) {
-    if (context.getConfiguration().get(LOGLEVEL) != null)
-      return Level.toLevel(context.getConfiguration().getInt(LOGLEVEL, Level.INFO.toInt()));
-    return null;
+  public static void setSimulationMode(Job job, boolean enableFeature) {
+    OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature);
   }
   
   /**
@@ -416,8 +302,8 @@ public class AccumuloOutputFormat extend
    * @since 1.5.0
    * @see #setSimulationMode(Job, boolean)
    */
-  protected static boolean getSimulationMode(JobContext context) {
-    return context.getConfiguration().getBoolean(SIMULATE, false);
+  protected static Boolean getSimulationMode(JobContext context) {
+    return OutputConfigurator.getSimulationMode(CLASS, context.getConfiguration());
   }
   
   /**
@@ -582,11 +468,10 @@ public class AccumuloOutputFormat extend
   
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
-    if (!job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IOException("Output info has not been set.");
-    if (!job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IOException("Instance info has not been set.");
+    if (!isConnectorInfoSet(job))
+      throw new IOException("Connector info has not been set.");
     try {
+      // if the instance isn't configured, it will complain here
       Connector c = getInstance(job).getConnector(getUsername(job), getPassword(job));
       if (!c.securityOperations().authenticateUser(getUsername(job), getPassword(job)))
         throw new IOException("Unable to authenticate user");
@@ -616,38 +501,14 @@ public class AccumuloOutputFormat extend
   // ----------------------------------------------------------------------------------------------------
   
   /**
-   * @deprecated since 1.5.0;
-   */
-  @Deprecated
-  private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory";
-  
-  /**
-   * @deprecated since 1.5.0;
-   */
-  @Deprecated
-  private static final String MAX_LATENCY = PREFIX + ".maxlatency";
-  
-  /**
-   * @deprecated since 1.5.0;
-   */
-  @Deprecated
-  private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads";
-  
-  /**
-   * @deprecated since 1.5.0; Use {@link #setOutputInfo(Job, String, byte[], boolean, String)} instead.
+   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link #setCreateTables(Job, boolean)}, and
+   *             {@link #setDefaultTableName(Job, String)} instead.
    */
   @Deprecated
   public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) {
-    if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Output info can only be set once per job");
-    conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(user, passwd);
-    conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    conf.setBoolean(CREATETABLES, createTables);
-    if (defaultTable != null)
-      conf.set(DEFAULT_TABLE_NAME, defaultTable);
+    OutputConfigurator.setConnectorInfo(CLASS, conf, user, passwd);
+    OutputConfigurator.setCreateTables(CLASS, conf, createTables);
+    OutputConfigurator.setDefaultTableName(CLASS, conf, defaultTable);
   }
   
   /**
@@ -655,13 +516,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
+    OutputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
   }
   
   /**
@@ -669,9 +524,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setMockInstance(Configuration conf, String instanceName) {
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    conf.setBoolean(MOCK, true);
-    conf.set(INSTANCE_NAME, instanceName);
+    OutputConfigurator.setMockInstance(CLASS, conf, instanceName);
   }
   
   /**
@@ -679,7 +532,9 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
-    conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes);
+    BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxMemory(numberOfBytes);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
   }
   
   /**
@@ -687,7 +542,9 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
-    conf.setInt(MAX_LATENCY, numberOfMilliseconds);
+    BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxLatency(numberOfMilliseconds, TimeUnit.MILLISECONDS);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
   }
   
   /**
@@ -695,7 +552,9 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
-    conf.setInt(NUM_WRITE_THREADS, numberOfThreads);
+    BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxWriteThreads(numberOfThreads);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
   }
   
   /**
@@ -703,8 +562,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setLogLevel(Configuration conf, Level level) {
-    ArgumentChecker.notNull(level);
-    conf.setInt(LOGLEVEL, level.toInt());
+    OutputConfigurator.setLogLevel(CLASS, conf, level);
   }
   
   /**
@@ -712,7 +570,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setSimulationMode(Configuration conf) {
-    conf.setBoolean(SIMULATE, true);
+    OutputConfigurator.setSimulationMode(CLASS, conf, true);
   }
   
   /**
@@ -720,7 +578,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static String getUsername(Configuration conf) {
-    return conf.get(USERNAME);
+    return OutputConfigurator.getUsername(CLASS, conf);
   }
   
   /**
@@ -728,7 +586,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+    return OutputConfigurator.getPassword(CLASS, conf);
   }
   
   /**
@@ -736,7 +594,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static boolean canCreateTables(Configuration conf) {
-    return conf.getBoolean(CREATETABLES, false);
+    return OutputConfigurator.canCreateTables(CLASS, conf);
   }
   
   /**
@@ -744,7 +602,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static String getDefaultTableName(Configuration conf) {
-    return conf.get(DEFAULT_TABLE_NAME);
+    return OutputConfigurator.getDefaultTableName(CLASS, conf);
   }
   
   /**
@@ -752,9 +610,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static Instance getInstance(Configuration conf) {
-    if (conf.getBoolean(MOCK, false))
-      return new MockInstance(conf.get(INSTANCE_NAME));
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
+    return OutputConfigurator.getInstance(CLASS, conf);
   }
   
   /**
@@ -762,7 +618,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static long getMaxMutationBufferSize(Configuration conf) {
-    return conf.getLong(MAX_MUTATION_BUFFER_SIZE, new BatchWriterConfig().getMaxMemory());
+    return OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxMemory();
   }
   
   /**
@@ -770,9 +626,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static int getMaxLatency(Configuration conf) {
-    Long maxLatency = new BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS);
-    Integer max = maxLatency >= Integer.MAX_VALUE ? Integer.MAX_VALUE : Integer.parseInt(Long.toString(maxLatency));
-    return conf.getInt(MAX_LATENCY, max);
+    return (int) OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxLatency(TimeUnit.MILLISECONDS);
   }
   
   /**
@@ -780,7 +634,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static int getMaxWriteThreads(Configuration conf) {
-    return conf.getInt(NUM_WRITE_THREADS, new BatchWriterConfig().getMaxWriteThreads());
+    return OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxWriteThreads();
   }
   
   /**
@@ -788,9 +642,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static Level getLogLevel(Configuration conf) {
-    if (conf.get(LOGLEVEL) != null)
-      return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
-    return null;
+    return OutputConfigurator.getLogLevel(CLASS, conf);
   }
   
   /**
@@ -798,7 +650,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static boolean getSimulationMode(Configuration conf) {
-    return conf.getBoolean(SIMULATE, false);
+    return OutputConfigurator.getSimulationMode(CLASS, conf);
   }
   
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java Tue Jan 22 18:07:17 2013
@@ -22,10 +22,12 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -36,8 +38,10 @@ import org.apache.hadoop.mapreduce.TaskA
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloRowInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String, byte[], String, org.apache.accumulo.core.security.Authorizations)}
- * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job, String, String)}
+ * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)}
+ * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)}
+ * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
  * Other static methods are optional.



Mime
View raw message