accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From md...@apache.org
Subject [10/12] ACCUMULO-1880 create mapreduce module
Date Mon, 21 Apr 2014 21:20:10 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
deleted file mode 100644
index e58e350..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.ClientSideIteratorScanner;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.impl.TabletLocator;
-import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
- * <p>
- * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V.
- * <p>
- * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its
- * {@link RecordReaderBase#nextKeyValue()} to transform them to the desired generic types K,V.
- * <p>
- * See {@link AccumuloInputFormat} for an example implementation.
- */
-public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> {
-
-  /**
-   * Gets the table name from the configuration.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the table name
-   * @since 1.5.0
-   * @see #setInputTableName(Job, String)
-   */
-  protected static String getInputTableName(JobContext context) {
-    return InputConfigurator.getInputTableName(CLASS, getConfiguration(context));
-  }
-
-  /**
-   * Sets the name of the input table, over which this job will scan.
-   * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param tableName
-   *          the table to use when the tablename is null in the write call
-   * @since 1.5.0
-   */
-  public static void setInputTableName(Job job, String tableName) {
-    InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
-  }
-
-  /**
-   * Sets the input ranges to scan for the single input table associated with this job.
-   * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param ranges
-   *          the ranges that will be mapped over
-   * @since 1.5.0
-   */
-  public static void setRanges(Job job, Collection<Range> ranges) {
-    InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
-  }
-
-  /**
-   * Gets the ranges to scan over from a job.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the ranges
-   * @since 1.5.0
-   * @see #setRanges(Job, Collection)
-   */
-  protected static List<Range> getRanges(JobContext context) throws IOException {
-    return InputConfigurator.getRanges(CLASS, getConfiguration(context));
-  }
-
-  /**
-   * Restricts the columns that will be mapped over for this job for the default input table.
-   * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param columnFamilyColumnQualifierPairs
-   *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
-   *          selected. An empty set is the default and is equivalent to scanning the all columns.
-   * @since 1.5.0
-   */
-  public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
-  }
-
-  /**
-   * Gets the columns to be mapped over from this job.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return a set of columns
-   * @since 1.5.0
-   * @see #fetchColumns(Job, Collection)
-   */
-  protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
-    return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context));
-  }
-
-  /**
-   * Encode an iterator on the single input table for this job.
-   * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param cfg
-   *          the configuration of the iterator
-   * @since 1.5.0
-   */
-  public static void addIterator(Job job, IteratorSetting cfg) {
-    InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
-  }
-
-  /**
-   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return a list of iterators
-   * @since 1.5.0
-   * @see #addIterator(Job, IteratorSetting)
-   */
-  protected static List<IteratorSetting> getIterators(JobContext context) {
-    return InputConfigurator.getIterators(CLASS, getConfiguration(context));
-  }
-
-  /**
-   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
-   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
-   * 
-   * <p>
-   * By default, this feature is <b>enabled</b>.
-   * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
-   * @see #setRanges(Job, Collection)
-   * @since 1.5.0
-   */
-  public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
-    InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
-  }
-
-  /**
-   * Determines whether a configuration has auto-adjust ranges enabled.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return false if the feature is disabled, true otherwise
-   * @since 1.5.0
-   * @see #setAutoAdjustRanges(Job, boolean)
-   */
-  protected static boolean getAutoAdjustRanges(JobContext context) {
-    return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context));
-  }
-
-  /**
-   * Controls the use of the {@link IsolatedScanner} in this job.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
-   * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
-   * @since 1.5.0
-   */
-  public static void setScanIsolation(Job job, boolean enableFeature) {
-    InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
-  }
-
-  /**
-   * Determines whether a configuration has isolation enabled.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return true if the feature is enabled, false otherwise
-   * @since 1.5.0
-   * @see #setScanIsolation(Job, boolean)
-   */
-  protected static boolean isIsolated(JobContext context) {
-    return InputConfigurator.isIsolated(CLASS, getConfiguration(context));
-  }
-
-  /**
-   * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
-   * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
-   * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
-   * @since 1.5.0
-   */
-  public static void setLocalIterators(Job job, boolean enableFeature) {
-    InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
-  }
-
-  /**
-   * Determines whether a configuration uses local iterators.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return true if the feature is enabled, false otherwise
-   * @since 1.5.0
-   * @see #setLocalIterators(Job, boolean)
-   */
-  protected static boolean usesLocalIterators(JobContext context) {
-    return InputConfigurator.usesLocalIterators(CLASS, getConfiguration(context));
-  }
-
-  /**
-   * <p>
-   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
-   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
-   * fail.
-   * 
-   * <p>
-   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
-   * 
-   * <p>
-   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
-   * on the mapper's classpath.
-   * 
-   * <p>
-   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
-   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
-   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
-   * 
-   * <p>
-   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
-   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
-   * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
-   * @since 1.5.0
-   */
-  public static void setOfflineTableScan(Job job, boolean enableFeature) {
-    InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
-  }
-
-  /**
-   * Determines whether a configuration has the offline table scan feature enabled.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return true if the feature is enabled, false otherwise
-   * @since 1.5.0
-   * @see #setOfflineTableScan(Job, boolean)
-   */
-  protected static boolean isOfflineScan(JobContext context) {
-    return InputConfigurator.isOfflineScan(CLASS, getConfiguration(context));
-  }
-
-  /**
-   * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return an Accumulo tablet locator
-   * @throws org.apache.accumulo.core.client.TableNotFoundException
-   *           if the table name set on the configuration doesn't exist
-   * @since 1.5.0
-   * @deprecated since 1.6.0
-   */
-  @Deprecated
-  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context)));
-  }
-
-  protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> {
-
-    /**
-     * Apply the configured iterators from the configuration to the scanner for the specified table name
-     * 
-     * @param context
-     *          the Hadoop context for the configured job
-     * @param scanner
-     *          the scanner to configure
-     * @since 1.6.0
-     */
-    @Override
-    protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
-      setupIterators(context, scanner, split);
-    }
-
-    /**
-     * Apply the configured iterators from the configuration to the scanner.
-     * 
-     * @param context
-     *          the Hadoop context for the configured job
-     * @param scanner
-     *          the scanner to configure
-     */
-    @Deprecated
-    protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
-      setupIterators(context, scanner, null);
-    }
-
-    /**
-     * Initialize a scanner over the given input split using this task attempt configuration.
-     */
-    protected void setupIterators(TaskAttemptContext context, Scanner scanner, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
-      List<IteratorSetting> iterators = null;
-      if (null == split) {
-        iterators = getIterators(context);
-      } else {
-        iterators = split.getIterators();
-        if (null == iterators) {
-          iterators = getIterators(context);
-        }
-      }
-      for (IteratorSetting iterator : iterators)
-        scanner.addScanIterator(iterator);
-    }
-  }
-
-  /**
-   * @deprecated since 1.5.2; Use {@link org.apache.accumulo.core.client.mapreduce.RangeInputSplit} instead.
-   * @see org.apache.accumulo.core.client.mapreduce.RangeInputSplit
-   */
-  @Deprecated
-  public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.RangeInputSplit {
-
-    public RangeInputSplit() {
-      super();
-    }
-
-    public RangeInputSplit(RangeInputSplit other) throws IOException {
-      super(other);
-    }
-
-    protected RangeInputSplit(String table, Range range, String[] locations) {
-      super(table, "", range, locations);
-    }
-
-    public RangeInputSplit(String table, String tableId, Range range, String[] locations) {
-      super(table, tableId, range, locations);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java
deleted file mode 100644
index e59451e..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-/**
- * This class to holds a batch scan configuration for a table. It contains all the properties needed to specify how rows should be returned from the table.
- */
-public class InputTableConfig implements Writable {
-
-  private List<IteratorSetting> iterators;
-  private List<Range> ranges;
-  private Collection<Pair<Text,Text>> columns;
-
-  private boolean autoAdjustRanges = true;
-  private boolean useLocalIterators = false;
-  private boolean useIsolatedScanners = false;
-  private boolean offlineScan = false;
-
-  public InputTableConfig() {}
-
-  /**
-   * Creates a batch scan config object out of a previously serialized batch scan config object.
-   * 
-   * @param input
-   *          the data input of the serialized batch scan config
-   */
-  public InputTableConfig(DataInput input) throws IOException {
-    readFields(input);
-  }
-
-  /**
-   * Sets the input ranges to scan for all tables associated with this job. This will be added to any per-table ranges that have been set using
-   * 
-   * @param ranges
-   *          the ranges that will be mapped over
-   * @since 1.6.0
-   */
-  public InputTableConfig setRanges(List<Range> ranges) {
-    this.ranges = ranges;
-    return this;
-  }
-
-  /**
-   * Returns the ranges to be queried in the configuration
-   */
-  public List<Range> getRanges() {
-    return ranges != null ? ranges : new ArrayList<Range>();
-  }
-
-  /**
-   * Restricts the columns that will be mapped over for this job for the default input table.
-   * 
-   * @param columns
-   *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
-   *          selected. An empty set is the default and is equivalent to scanning the all columns.
-   * @since 1.6.0
-   */
-  public InputTableConfig fetchColumns(Collection<Pair<Text,Text>> columns) {
-    this.columns = columns;
-    return this;
-  }
-
-  /**
-   * Returns the columns to be fetched for this configuration
-   */
-  public Collection<Pair<Text,Text>> getFetchedColumns() {
-    return columns != null ? columns : new HashSet<Pair<Text,Text>>();
-  }
-
-  /**
-   * Set iterators on to be used in the query.
-   * 
-   * @param iterators
-   *          the configurations for the iterators
-   * @since 1.6.0
-   */
-  public InputTableConfig setIterators(List<IteratorSetting> iterators) {
-    this.iterators = iterators;
-    return this;
-  }
-
-  /**
-   * Returns the iterators to be set on this configuration
-   */
-  public List<IteratorSetting> getIterators() {
-    return iterators != null ? iterators : new ArrayList<IteratorSetting>();
-  }
-
-  /**
-   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
-   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
-   * 
-   * <p>
-   * By default, this feature is <b>enabled</b>.
-   * 
-   * @param autoAdjustRanges
-   *          the feature is enabled if true, disabled otherwise
-   * @see #setRanges(java.util.List)
-   * @since 1.6.0
-   */
-  public InputTableConfig setAutoAdjustRanges(boolean autoAdjustRanges) {
-    this.autoAdjustRanges = autoAdjustRanges;
-    return this;
-  }
-
-  /**
-   * Determines whether a configuration has auto-adjust ranges enabled.
-   * 
-   * @return false if the feature is disabled, true otherwise
-   * @since 1.6.0
-   * @see #setAutoAdjustRanges(boolean)
-   */
-  public boolean shouldAutoAdjustRanges() {
-    return autoAdjustRanges;
-  }
-
-  /**
-   * Controls the use of the {@link org.apache.accumulo.core.client.ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack
-   * to be constructed within the Map task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be
-   * available on the classpath for the task.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
-   * 
-   * @param useLocalIterators
-   *          the feature is enabled if true, disabled otherwise
-   * @since 1.6.0
-   */
-  public InputTableConfig setUseLocalIterators(boolean useLocalIterators) {
-    this.useLocalIterators = useLocalIterators;
-    return this;
-  }
-
-  /**
-   * Determines whether a configuration uses local iterators.
-   * 
-   * @return true if the feature is enabled, false otherwise
-   * @since 1.6.0
-   * @see #setUseLocalIterators(boolean)
-   */
-  public boolean shouldUseLocalIterators() {
-    return useLocalIterators;
-  }
-
-  /**
-   * <p>
-   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
-   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
-   * fail.
-   * 
-   * <p>
-   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
-   * 
-   * <p>
-   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
-   * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
-   * 
-   * <p>
-   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
-   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
-   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
-   * 
-   * <p>
-   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
-   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
-   * 
-   * @param offlineScan
-   *          the feature is enabled if true, disabled otherwise
-   * @since 1.6.0
-   */
-  public InputTableConfig setOfflineScan(boolean offlineScan) {
-    this.offlineScan = offlineScan;
-    return this;
-  }
-
-  /**
-   * Determines whether a configuration has the offline table scan feature enabled.
-   * 
-   * @return true if the feature is enabled, false otherwise
-   * @since 1.6.0
-   * @see #setOfflineScan(boolean)
-   */
-  public boolean isOfflineScan() {
-    return offlineScan;
-  }
-
-  /**
-   * Controls the use of the {@link org.apache.accumulo.core.client.IsolatedScanner} in this job.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
-   * 
-   * @param useIsolatedScanners
-   *          the feature is enabled if true, disabled otherwise
-   * @since 1.6.0
-   */
-  public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) {
-    this.useIsolatedScanners = useIsolatedScanners;
-    return this;
-  }
-
-  /**
-   * Determines whether a configuration has isolation enabled.
-   * 
-   * @return true if the feature is enabled, false otherwise
-   * @since 1.6.0
-   * @see #setUseIsolatedScanners(boolean)
-   */
-  public boolean shouldUseIsolatedScanners() {
-    return useIsolatedScanners;
-  }
-
-  /**
-   * Writes the state for the current object out to the specified {@link DataOutput}
-   * 
-   * @param dataOutput
-   *          the output for which to write the object's state
-   */
-  @Override
-  public void write(DataOutput dataOutput) throws IOException {
-    if (iterators != null) {
-      dataOutput.writeInt(iterators.size());
-      for (IteratorSetting setting : iterators)
-        setting.write(dataOutput);
-    } else {
-      dataOutput.writeInt(0);
-    }
-    if (ranges != null) {
-      dataOutput.writeInt(ranges.size());
-      for (Range range : ranges)
-        range.write(dataOutput);
-    } else {
-      dataOutput.writeInt(0);
-    }
-    if (columns != null) {
-      dataOutput.writeInt(columns.size());
-      for (Pair<Text,Text> column : columns) {
-        if (column.getSecond() == null) {
-          dataOutput.writeInt(1);
-          column.getFirst().write(dataOutput);
-        } else {
-          dataOutput.writeInt(2);
-          column.getFirst().write(dataOutput);
-          column.getSecond().write(dataOutput);
-        }
-      }
-    } else {
-      dataOutput.writeInt(0);
-    }
-    dataOutput.writeBoolean(autoAdjustRanges);
-    dataOutput.writeBoolean(useLocalIterators);
-    dataOutput.writeBoolean(useIsolatedScanners);
-  }
-
-  /**
-   * Reads the fields in the {@link DataInput} into the current object
-   * 
-   * @param dataInput
-   *          the input fields to read into the current object
-   */
-  @Override
-  public void readFields(DataInput dataInput) throws IOException {
-    // load iterators
-    long iterSize = dataInput.readInt();
-    if (iterSize > 0)
-      iterators = new ArrayList<IteratorSetting>();
-    for (int i = 0; i < iterSize; i++)
-      iterators.add(new IteratorSetting(dataInput));
-    // load ranges
-    long rangeSize = dataInput.readInt();
-    if (rangeSize > 0)
-      ranges = new ArrayList<Range>();
-    for (int i = 0; i < rangeSize; i++) {
-      Range range = new Range();
-      range.readFields(dataInput);
-      ranges.add(range);
-    }
-    // load columns
-    long columnSize = dataInput.readInt();
-    if (columnSize > 0)
-      columns = new HashSet<Pair<Text,Text>>();
-    for (int i = 0; i < columnSize; i++) {
-      long numPairs = dataInput.readInt();
-      Text colFam = new Text();
-      colFam.readFields(dataInput);
-      if (numPairs == 1) {
-        columns.add(new Pair<Text,Text>(colFam, null));
-      } else if (numPairs == 2) {
-        Text colQual = new Text();
-        colQual.readFields(dataInput);
-        columns.add(new Pair<Text,Text>(colFam, colQual));
-      }
-    }
-    autoAdjustRanges = dataInput.readBoolean();
-    useLocalIterators = dataInput.readBoolean();
-    useIsolatedScanners = dataInput.readBoolean();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o)
-      return true;
-    if (o == null || getClass() != o.getClass())
-      return false;
-
-    InputTableConfig that = (InputTableConfig) o;
-
-    if (autoAdjustRanges != that.autoAdjustRanges)
-      return false;
-    if (offlineScan != that.offlineScan)
-      return false;
-    if (useIsolatedScanners != that.useIsolatedScanners)
-      return false;
-    if (useLocalIterators != that.useLocalIterators)
-      return false;
-    if (columns != null ? !columns.equals(that.columns) : that.columns != null)
-      return false;
-    if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null)
-      return false;
-    if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null)
-      return false;
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = 31 * (iterators != null ? iterators.hashCode() : 0);
-    result = 31 * result + (ranges != null ? ranges.hashCode() : 0);
-    result = 31 * result + (columns != null ? columns.hashCode() : 0);
-    result = 31 * result + (autoAdjustRanges ? 1 : 0);
-    result = 31 * result + (useLocalIterators ? 1 : 0);
-    result = 31 * result + (useIsolatedScanners ? 1 : 0);
-    result = 31 * result + (offlineScan ? 1 : 0);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
deleted file mode 100644
index 4b5a149..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
-import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.log4j.Level;
-
-/**
- * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
- */
-public class RangeInputSplit extends InputSplit implements Writable {
-  private Range range;
-  private String[] locations;
-  private String tableId, tableName, instanceName, zooKeepers, principal;
-  private TokenSource tokenSource;
-  private String tokenFile;
-  private AuthenticationToken token;
-  private Boolean offline, mockInstance, isolatedScan, localIterators;
-  private Authorizations auths;
-  private Set<Pair<Text,Text>> fetchedColumns;
-  private List<IteratorSetting> iterators;
-  private Level level;
-
-  public RangeInputSplit() {
-    range = new Range();
-    locations = new String[0];
-    tableName = "";
-    tableId = "";
-  }
-
-  public RangeInputSplit(RangeInputSplit split) throws IOException {
-    this.setRange(split.getRange());
-    this.setLocations(split.getLocations());
-    this.setTableName(split.getTableName());
-    this.setTableId(split.getTableId());
-  }
-
-  protected RangeInputSplit(String table, String tableId, Range range, String[] locations) {
-    this.range = range;
-    setLocations(locations);
-    this.tableName = table;
-    this.tableId = tableId;
-  }
-
-  public Range getRange() {
-    return range;
-  }
-
-  private static byte[] extractBytes(ByteSequence seq, int numBytes) {
-    byte[] bytes = new byte[numBytes + 1];
-    bytes[0] = 0;
-    for (int i = 0; i < numBytes; i++) {
-      if (i >= seq.length())
-        bytes[i + 1] = 0;
-      else
-        bytes[i + 1] = seq.byteAt(i);
-    }
-    return bytes;
-  }
-
-  public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
-    int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
-    BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
-    BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
-    BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
-    return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
-  }
-
-  public float getProgress(Key currentKey) {
-    if (currentKey == null)
-      return 0f;
-    if (range.getStartKey() != null && range.getEndKey() != null) {
-      if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
-        // just look at the row progress
-        return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
-      } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) {
-        // just look at the column family progress
-        return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
-      } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) {
-        // just look at the column qualifier progress
-        return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
-      }
-    }
-    // if we can't figure it out, then claim no progress
-    return 0f;
-  }
-
-  /**
-   * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
-   */
-  @Override
-  public long getLength() throws IOException {
-    Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
-    Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
-    int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
-    long diff = 0;
-
-    byte[] start = startRow.getBytes();
-    byte[] stop = stopRow.getBytes();
-    for (int i = 0; i < maxCommon; ++i) {
-      diff |= 0xff & (start[i] ^ stop[i]);
-      diff <<= Byte.SIZE;
-    }
-
-    if (startRow.getLength() != stopRow.getLength())
-      diff |= 0xff;
-
-    return diff + 1;
-  }
-
-  @Override
-  public String[] getLocations() throws IOException {
-    return Arrays.copyOf(locations, locations.length);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    range.readFields(in);
-    tableName = in.readUTF();
-    tableId = in.readUTF();
-    int numLocs = in.readInt();
-    locations = new String[numLocs];
-    for (int i = 0; i < numLocs; ++i)
-      locations[i] = in.readUTF();
-
-    if (in.readBoolean()) {
-      isolatedScan = in.readBoolean();
-    }
-
-    if (in.readBoolean()) {
-      offline = in.readBoolean();
-    }
-
-    if (in.readBoolean()) {
-      localIterators = in.readBoolean();
-    }
-
-    if (in.readBoolean()) {
-      mockInstance = in.readBoolean();
-    }
-
-    if (in.readBoolean()) {
-      int numColumns = in.readInt();
-      List<String> columns = new ArrayList<String>(numColumns);
-      for (int i = 0; i < numColumns; i++) {
-        columns.add(in.readUTF());
-      }
-
-      fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns);
-    }
-
-    if (in.readBoolean()) {
-      String strAuths = in.readUTF();
-      auths = new Authorizations(strAuths.getBytes(StandardCharsets.UTF_8));
-    }
-
-    if (in.readBoolean()) {
-      principal = in.readUTF();
-    }
-
-    if (in.readBoolean()) {
-      int ordinal = in.readInt();
-      this.tokenSource = TokenSource.values()[ordinal];
-
-      switch (this.tokenSource) {
-        case INLINE:
-          String tokenClass = in.readUTF();
-          byte[] base64TokenBytes = in.readUTF().getBytes(StandardCharsets.UTF_8);
-          byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes);
-
-          this.token = AuthenticationTokenSerializer.deserialize(tokenClass, tokenBytes);
-          break;
-
-        case FILE:
-          this.tokenFile = in.readUTF();
-
-          break;
-        default:
-          throw new IOException("Cannot parse unknown TokenSource ordinal");
-      }
-    }
-
-    if (in.readBoolean()) {
-      instanceName = in.readUTF();
-    }
-
-    if (in.readBoolean()) {
-      zooKeepers = in.readUTF();
-    }
-
-    if (in.readBoolean()) {
-      level = Level.toLevel(in.readInt());
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    range.write(out);
-    out.writeUTF(tableName);
-    out.writeUTF(tableId);
-    out.writeInt(locations.length);
-    for (int i = 0; i < locations.length; ++i)
-      out.writeUTF(locations[i]);
-
-    out.writeBoolean(null != isolatedScan);
-    if (null != isolatedScan) {
-      out.writeBoolean(isolatedScan);
-    }
-
-    out.writeBoolean(null != offline);
-    if (null != offline) {
-      out.writeBoolean(offline);
-    }
-
-    out.writeBoolean(null != localIterators);
-    if (null != localIterators) {
-      out.writeBoolean(localIterators);
-    }
-
-    out.writeBoolean(null != mockInstance);
-    if (null != mockInstance) {
-      out.writeBoolean(mockInstance);
-    }
-
-    out.writeBoolean(null != fetchedColumns);
-    if (null != fetchedColumns) {
-      String[] cols = InputConfigurator.serializeColumns(fetchedColumns);
-      out.writeInt(cols.length);
-      for (String col : cols) {
-        out.writeUTF(col);
-      }
-    }
-
-    out.writeBoolean(null != auths);
-    if (null != auths) {
-      out.writeUTF(auths.serialize());
-    }
-
-    out.writeBoolean(null != principal);
-    if (null != principal) {
-      out.writeUTF(principal);
-    }
-
-    out.writeBoolean(null != tokenSource);
-    if (null != tokenSource) {
-      out.writeInt(tokenSource.ordinal());
-
-      if (null != token && null != tokenFile) {
-        throw new IOException("Cannot use both inline AuthenticationToken and file-based AuthenticationToken");
-      } else if (null != token) {
-        out.writeUTF(token.getClass().getCanonicalName());
-        out.writeUTF(Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token)));
-      } else {
-        out.writeUTF(tokenFile);
-      }
-    }
-
-    out.writeBoolean(null != instanceName);
-    if (null != instanceName) {
-      out.writeUTF(instanceName);
-    }
-
-    out.writeBoolean(null != zooKeepers);
-    if (null != zooKeepers) {
-      out.writeUTF(zooKeepers);
-    }
-
-    out.writeBoolean(null != level);
-    if (null != level) {
-      out.writeInt(level.toInt());
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder(256);
-    sb.append("Range: ").append(range);
-    sb.append(" Locations: ").append(Arrays.asList(locations));
-    sb.append(" Table: ").append(tableName);
-    sb.append(" TableID: ").append(tableId);
-    sb.append(" InstanceName: ").append(instanceName);
-    sb.append(" zooKeepers: ").append(zooKeepers);
-    sb.append(" principal: ").append(principal);
-    sb.append(" tokenSource: ").append(tokenSource);
-    sb.append(" authenticationToken: ").append(token);
-    sb.append(" authenticationTokenFile: ").append(tokenFile);
-    sb.append(" Authorizations: ").append(auths);
-    sb.append(" offlineScan: ").append(offline);
-    sb.append(" mockInstance: ").append(mockInstance);
-    sb.append(" isolatedScan: ").append(isolatedScan);
-    sb.append(" localIterators: ").append(localIterators);
-    sb.append(" fetchColumns: ").append(fetchedColumns);
-    sb.append(" iterators: ").append(iterators);
-    sb.append(" logLevel: ").append(level);
-    return sb.toString();
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  public void setTableName(String table) {
-    this.tableName = table;
-  }
-
-  public void setTableId(String tableId) {
-    this.tableId = tableId;
-  }
-
-  public String getTableId() {
-    return tableId;
-  }
-
-  public Instance getInstance() {
-    if (null == instanceName) {
-      return null;
-    }
-
-    if (isMockInstance()) {
-      return new MockInstance(getInstanceName());
-    }
-
-    if (null == zooKeepers) {
-      return null;
-    }
-
-    return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(getInstanceName()).withZkHosts(getZooKeepers()));
-  }
-
-  public String getInstanceName() {
-    return instanceName;
-  }
-
-  public void setInstanceName(String instanceName) {
-    this.instanceName = instanceName;
-  }
-
-  public String getZooKeepers() {
-    return zooKeepers;
-  }
-
-  public void setZooKeepers(String zooKeepers) {
-    this.zooKeepers = zooKeepers;
-  }
-
-  public String getPrincipal() {
-    return principal;
-  }
-
-  public void setPrincipal(String principal) {
-    this.principal = principal;
-  }
-
-  public AuthenticationToken getToken() {
-    return token;
-  }
-
-  public void setToken(AuthenticationToken token) {
-    this.tokenSource = TokenSource.INLINE;
-    this.token = token;
-  }
-
-  public void setToken(String tokenFile) {
-    this.tokenSource = TokenSource.FILE;
-    this.tokenFile = tokenFile;
-  }
-
-  public Boolean isOffline() {
-    return offline;
-  }
-
-  public void setOffline(Boolean offline) {
-    this.offline = offline;
-  }
-
-  public void setLocations(String[] locations) {
-    this.locations = Arrays.copyOf(locations, locations.length);
-  }
-
-  public Boolean isMockInstance() {
-    return mockInstance;
-  }
-
-  public void setMockInstance(Boolean mockInstance) {
-    this.mockInstance = mockInstance;
-  }
-
-  public Boolean isIsolatedScan() {
-    return isolatedScan;
-  }
-
-  public void setIsolatedScan(Boolean isolatedScan) {
-    this.isolatedScan = isolatedScan;
-  }
-
-  public Authorizations getAuths() {
-    return auths;
-  }
-
-  public void setAuths(Authorizations auths) {
-    this.auths = auths;
-  }
-
-  public void setRange(Range range) {
-    this.range = range;
-  }
-
-  public Boolean usesLocalIterators() {
-    return localIterators;
-  }
-
-  public void setUsesLocalIterators(Boolean localIterators) {
-    this.localIterators = localIterators;
-  }
-
-  public Set<Pair<Text,Text>> getFetchedColumns() {
-    return fetchedColumns;
-  }
-
-  public void setFetchedColumns(Collection<Pair<Text,Text>> fetchedColumns) {
-    this.fetchedColumns = new HashSet<Pair<Text,Text>>();
-    for (Pair<Text,Text> columns : fetchedColumns) {
-      this.fetchedColumns.add(columns);
-    }
-  }
-
-  public void setFetchedColumns(Set<Pair<Text,Text>> fetchedColumns) {
-    this.fetchedColumns = fetchedColumns;
-  }
-
-  public List<IteratorSetting> getIterators() {
-    return iterators;
-  }
-
-  public void setIterators(List<IteratorSetting> iterators) {
-    this.iterators = iterators;
-  }
-
-  public Level getLogLevel() {
-    return level;
-  }
-
-  public void setLogLevel(Level level) {
-    this.level = level;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
deleted file mode 100644
index 4610556..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce.lib.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-/**
- * @since 1.6.0
- */
-public class ConfiguratorBase {
-
-  /**
-   * Configuration keys for {@link Instance#getConnector(String, AuthenticationToken)}.
-   * 
-   * @since 1.6.0
-   */
-  public static enum ConnectorInfo {
-    IS_CONFIGURED, PRINCIPAL, TOKEN,
-  }
-
-  public static enum TokenSource {
-    FILE, INLINE;
-
-    private String prefix;
-
-    private TokenSource() {
-      prefix = name().toLowerCase() + ":";
-    }
-
-    public String prefix() {
-      return prefix;
-    }
-  }
-
-  /**
-   * Configuration keys for {@link Instance}, {@link ZooKeeperInstance}, and {@link MockInstance}.
-   * 
-   * @since 1.6.0
-   */
-  public static enum InstanceOpts {
-    TYPE, NAME, ZOO_KEEPERS, CLIENT_CONFIG;
-  }
-
-  /**
-   * Configuration keys for general configuration options.
-   * 
-   * @since 1.6.0
-   */
-  public static enum GeneralOpts {
-    LOG_LEVEL
-  }
-
-  /**
-   * Provides a configuration key for a given feature enum, prefixed by the implementingClass
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param e
-   *          the enum used to provide the unique part of the configuration key
-   * @return the configuration key
-   * @since 1.6.0
-   */
-  protected static String enumToConfKey(Class<?> implementingClass, Enum<?> e) {
-    return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "." + StringUtils.camelize(e.name().toLowerCase());
-  }
-
-  /**
-   * Sets the connector information needed to communicate with Accumulo in this job.
-   * 
-   * <p>
-   * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
-   * conversion to a string, and is not intended to be secure.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param principal
-   *          a valid Accumulo user name
-   * @param token
-   *          the user's password
-   * @since 1.6.0
-   */
-  public static void setConnectorInfo(Class<?> implementingClass, Configuration conf, String principal, AuthenticationToken token)
-      throws AccumuloSecurityException {
-    if (isConnectorInfoSet(implementingClass, conf))
-      throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job");
-
-    checkArgument(principal != null, "principal is null");
-    checkArgument(token != null, "token is null");
-    conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
-    conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal);
-    conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN),
-        TokenSource.INLINE.prefix() + token.getClass().getName() + ":" + Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token)));
-  }
-
-  /**
-   * Sets the connector information needed to communicate with Accumulo in this job.
-   * 
-   * <p>
-   * Pulls a token file into the Distributed Cache that contains the authentication token in an attempt to be more secure than storing the password in the
-   * Configuration. Token file created with "bin/accumulo create-token".
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param principal
-   *          a valid Accumulo user name
-   * @param tokenFile
-   *          the path to the token file in DFS
-   * @since 1.6.0
-   */
-  public static void setConnectorInfo(Class<?> implementingClass, Configuration conf, String principal, String tokenFile) throws AccumuloSecurityException {
-    if (isConnectorInfoSet(implementingClass, conf))
-      throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job");
-
-    checkArgument(principal != null, "principal is null");
-    checkArgument(tokenFile != null, "tokenFile is null");
-
-    try {
-      DistributedCacheHelper.addCacheFile(new URI(tokenFile), conf);
-    } catch (URISyntaxException e) {
-      throw new IllegalStateException("Unable to add tokenFile \"" + tokenFile + "\" to distributed cache.");
-    }
-
-    conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
-    conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal);
-    conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), TokenSource.FILE.prefix() + tokenFile);
-  }
-
-  /**
-   * Determines if the connector info has already been set for this instance.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return true if the connector info has already been set, false otherwise
-   * @since 1.6.0
-   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
-   */
-  public static Boolean isConnectorInfoSet(Class<?> implementingClass, Configuration conf) {
-    return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), false);
-  }
-
-  /**
-   * Gets the user name from the configuration.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return the principal
-   * @since 1.6.0
-   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
-   */
-  public static String getPrincipal(Class<?> implementingClass, Configuration conf) {
-    return conf.get(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL));
-  }
-
-  /**
-   * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return the principal's authentication token
-   * @since 1.6.0
-   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
-   * @see #setConnectorInfo(Class, Configuration, String, String)
-   */
-  public static AuthenticationToken getAuthenticationToken(Class<?> implementingClass, Configuration conf) {
-    String token = conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN));
-    if (token == null || token.isEmpty())
-      return null;
-    if (token.startsWith(TokenSource.INLINE.prefix())) {
-      String[] args = token.substring(TokenSource.INLINE.prefix().length()).split(":", 2);
-      if (args.length == 2)
-        return AuthenticationTokenSerializer.deserialize(args[0], Base64.decodeBase64(args[1].getBytes(StandardCharsets.UTF_8)));
-    } else if (token.startsWith(TokenSource.FILE.prefix())) {
-      String tokenFileName = token.substring(TokenSource.FILE.prefix().length());
-      return getTokenFromFile(conf, getPrincipal(implementingClass, conf), tokenFileName);
-    }
-
-    throw new IllegalStateException("Token was not properly serialized into the configuration");
-  }
-
-  /**
-   * Reads from the token file in distributed cache. Currently, the token file stores data separated by colons e.g. principal:token_class:token
-   * 
-   * @param conf
-   *          the Hadoop context for the configured job
-   * @return path to the token file as a String
-   * @since 1.6.0
-   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
-   */
-  public static AuthenticationToken getTokenFromFile(Configuration conf, String principal, String tokenFile) {
-    FSDataInputStream in = null;
-    try {
-      URI[] uris = DistributedCacheHelper.getCacheFiles(conf);
-      Path path = null;
-      for (URI u : uris) {
-        if (u.toString().equals(tokenFile)) {
-          path = new Path(u);
-        }
-      }
-      if (path == null) {
-        throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile + "\" in cache.");
-      }
-      FileSystem fs = FileSystem.get(conf);
-      in = fs.open(path);
-    } catch (IOException e) {
-      throw new IllegalArgumentException("Couldn't open password file called \"" + tokenFile + "\".");
-    }
-    try (java.util.Scanner fileScanner = new java.util.Scanner(in)) {
-      while (fileScanner.hasNextLine()) {
-        Credentials creds = Credentials.deserialize(fileScanner.nextLine());
-        if (principal.equals(creds.getPrincipal())) {
-          return creds.getToken();
-        }
-      }
-      throw new IllegalArgumentException("Couldn't find token for user \"" + principal + "\" in file \"" + tokenFile + "\"");
-    }
-  }
-
-  /**
-   * Configures a {@link ZooKeeperInstance} for this job.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param clientConfig
-   *          client configuration for specifying connection timeouts, SSL connection options, etc.
-   * @since 1.6.0
-   */
-  public static void setZooKeeperInstance(Class<?> implementingClass, Configuration conf, ClientConfiguration clientConfig) {
-    String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
-    if (!conf.get(key, "").isEmpty())
-      throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key));
-    conf.set(key, "ZooKeeperInstance");
-    if (clientConfig != null) {
-      conf.set(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG), clientConfig.serialize());
-    }
-  }
-
-  /**
-   * Configures a {@link MockInstance} for this job.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param instanceName
-   *          the Accumulo instance name
-   * @since 1.6.0
-   */
-  public static void setMockInstance(Class<?> implementingClass, Configuration conf, String instanceName) {
-    String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
-    if (!conf.get(key, "").isEmpty())
-      throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key));
-    conf.set(key, "MockInstance");
-
-    checkArgument(instanceName != null, "instanceName is null");
-    conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName);
-  }
-
-  /**
-   * Initializes an Accumulo {@link Instance} based on the configuration.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return an Accumulo instance
-   * @since 1.6.0
-   * @see #setZooKeeperInstance(Class, Configuration, ClientConfiguration)
-   * @see #setMockInstance(Class, Configuration, String)
-   */
-  public static Instance getInstance(Class<?> implementingClass, Configuration conf) {
-    String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE), "");
-    if ("MockInstance".equals(instanceType))
-      return new MockInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)));
-    else if ("ZooKeeperInstance".equals(instanceType)) {
-      String clientConfigString = conf.get(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG));
-      if (clientConfigString == null) {
-        String instanceName = conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME));
-        String zookeepers = conf.get(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS));
-        return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers));
-      } else {
-        return new ZooKeeperInstance(ClientConfiguration.deserialize(clientConfigString));
-      }
-    } else if (instanceType.isEmpty())
-      throw new IllegalStateException("Instance has not been configured for " + implementingClass.getSimpleName());
-    else
-      throw new IllegalStateException("Unrecognized instance type " + instanceType);
-  }
-
-  /**
-   * Sets the log level for this job.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param level
-   *          the logging level
-   * @since 1.6.0
-   */
-  public static void setLogLevel(Class<?> implementingClass, Configuration conf, Level level) {
-    checkArgument(level != null, "level is null");
-    Logger.getLogger(implementingClass).setLevel(level);
-    conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt());
-  }
-
-  /**
-   * Gets the log level from this configuration.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return the log level
-   * @since 1.6.0
-   * @see #setLogLevel(Class, Configuration, Level)
-   */
-  public static Level getLogLevel(Class<?> implementingClass, Configuration conf) {
-    return Level.toLevel(conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), Level.INFO.toInt()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java
deleted file mode 100644
index c694b9a..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce.lib.impl;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.Path;
-
-/**
- * @since 1.6.0
- */
-@SuppressWarnings("deprecation")
-public class DistributedCacheHelper {
-
-  /**
-   * @since 1.6.0
-   */
-  public static void addCacheFile(URI uri, Configuration conf) {
-    DistributedCache.addCacheFile(uri, conf);
-  }
-
-  /**
-   * @since 1.6.0
-   */
-  public static URI[] getCacheFiles(Configuration conf) throws IOException {
-    return DistributedCache.getCacheFiles(conf);
-  }
-
-  /**
-   * @since 1.6.0
-   */
-  public static Path[] getLocalCacheFiles(Configuration conf) throws IOException {
-    return DistributedCache.getLocalCacheFiles(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
deleted file mode 100644
index ce84209..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce.lib.impl;
-
-import java.util.Arrays;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * @since 1.6.0
- */
-public class FileOutputConfigurator extends ConfiguratorBase {
-
-  /**
-   * Configuration keys for {@link AccumuloConfiguration}.
-   * 
-   * @since 1.6.0
-   */
-  public static enum Opts {
-    ACCUMULO_PROPERTIES;
-  }
-
-  /**
-   * The supported Accumulo properties we set in this OutputFormat, that change the behavior of the RecordWriter.<br />
-   * These properties correspond to the supported public static setter methods available to this class.
-   * 
-   * @param property
-   *          the Accumulo property to check
-   * @since 1.6.0
-   */
-  protected static Boolean isSupportedAccumuloProperty(Property property) {
-    switch (property) {
-      case TABLE_FILE_COMPRESSION_TYPE:
-      case TABLE_FILE_COMPRESSED_BLOCK_SIZE:
-      case TABLE_FILE_BLOCK_SIZE:
-      case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX:
-      case TABLE_FILE_REPLICATION:
-        return true;
-      default:
-        return false;
-    }
-  }
-
-  /**
-   * Helper for transforming Accumulo configuration properties into something that can be stored safely inside the Hadoop Job configuration.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param property
-   *          the supported Accumulo property
-   * @param value
-   *          the value of the property to set
-   * @since 1.6.0
-   */
-  private static <T> void setAccumuloProperty(Class<?> implementingClass, Configuration conf, Property property, T value) {
-    if (isSupportedAccumuloProperty(property)) {
-      String val = String.valueOf(value);
-      if (property.getType().isValidFormat(val))
-        conf.set(enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "." + property.getKey(), val);
-      else
-        throw new IllegalArgumentException("Value is not appropriate for property type '" + property.getType() + "'");
-    } else
-      throw new IllegalArgumentException("Unsupported configuration property " + property.getKey());
-  }
-
-  /**
-   * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been
-   * stored in the Job's configuration.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @since 1.6.0
-   */
-  public static AccumuloConfiguration getAccumuloConfiguration(Class<?> implementingClass, Configuration conf) {
-    String prefix = enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + ".";
-    ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
-    for (Entry<String,String> entry : conf)
-      if (entry.getKey().startsWith(prefix))
-        acuConf.set(Property.getPropertyByKey(entry.getKey().substring(prefix.length())), entry.getValue());
-    return acuConf;
-  }
-
-  /**
-   * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param compressionType
-   *          one of "none", "gz", "lzo", or "snappy"
-   * @since 1.6.0
-   */
-  public static void setCompressionType(Class<?> implementingClass, Configuration conf, String compressionType) {
-    if (compressionType == null || !Arrays.asList("none", "gz", "lzo", "snappy").contains(compressionType))
-      throw new IllegalArgumentException("Compression type must be one of: none, gz, lzo, snappy");
-    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType);
-  }
-
-  /**
-   * Sets the size for data blocks within each file.<br />
-   * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group.
-   * 
-   * <p>
-   * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance).
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param dataBlockSize
-   *          the block size, in bytes
-   * @since 1.6.0
-   */
-  public static void setDataBlockSize(Class<?> implementingClass, Configuration conf, long dataBlockSize) {
-    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, dataBlockSize);
-  }
-
-  /**
-   * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param fileBlockSize
-   *          the block size, in bytes
-   * @since 1.6.0
-   */
-  public static void setFileBlockSize(Class<?> implementingClass, Configuration conf, long fileBlockSize) {
-    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize);
-  }
-
-  /**
-   * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow
-   * index hierarchy within the file. This can affect the performance of queries.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param indexBlockSize
-   *          the block size, in bytes
-   * @since 1.6.0
-   */
-  public static void setIndexBlockSize(Class<?> implementingClass, Configuration conf, long indexBlockSize) {
-    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, indexBlockSize);
-  }
-
-  /**
-   * Sets the file system replication factor for the resulting file, overriding the file system default.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param replication
-   *          the number of replicas for produced files
-   * @since 1.6.0
-   */
-  public static void setReplication(Class<?> implementingClass, Configuration conf, int replication) {
-    setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_REPLICATION, replication);
-  }
-
-}


Mime
View raw message