accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [48/50] [abbrv] Merge branch '1.6'
Date Sat, 01 Nov 2014 04:57:42 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
----------------------------------------------------------------------
diff --cc mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
index 339c9a8,0000000..90a8622
mode 100644,000000..100644
--- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
+++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
@@@ -1,796 -1,0 +1,796 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce.lib.impl;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
++import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
- import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.StringTokenizer;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.TabletLocator;
 +import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
 +import org.apache.accumulo.core.client.mock.impl.MockTabletLocator;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.util.Base64;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.MapWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.util.StringUtils;
 +
 +import com.google.common.collect.Maps;
 +
 +/**
 + * @since 1.6.0
 + */
 +public class InputConfigurator extends ConfiguratorBase {
 +
 +  /**
 +   * Configuration keys for {@link Scanner}.
 +   * 
 +   * @since 1.6.0
 +   */
 +  public static enum ScanOpts {
 +    TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS, TABLE_CONFIGS
 +  }
 +
 +  /**
 +   * Configuration keys for various features.
 +   * 
 +   * @since 1.6.0
 +   */
 +  public static enum Features {
 +    AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE
 +  }
 +
 +  /**
 +   * Sets the name of the input table, over which this job will scan.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param tableName
 +   *          the table to use when the tablename is null in the write call
 +   * @since 1.6.0
 +   */
 +  public static void setInputTableName(Class<?> implementingClass, Configuration conf, String tableName) {
 +    checkArgument(tableName != null, "tableName is null");
 +    conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName);
 +  }
 +
 +  /**
 +   * Sets the name of the input table, over which this job will scan.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @since 1.6.0
 +   */
 +  public static String getInputTableName(Class<?> implementingClass, Configuration conf) {
 +    return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
 +  }
 +
 +  /**
 +   * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param auths
 +   *          the user's authorizations
 +   * @since 1.6.0
 +   */
 +  public static void setScanAuthorizations(Class<?> implementingClass, Configuration conf, Authorizations auths) {
 +    if (auths != null && !auths.isEmpty())
 +      conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), auths.serialize());
 +  }
 +
 +  /**
 +   * Gets the authorizations to set for the scans from the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the Accumulo scan authorizations
 +   * @since 1.6.0
 +   * @see #setScanAuthorizations(Class, Configuration, Authorizations)
 +   */
 +  public static Authorizations getScanAuthorizations(Class<?> implementingClass, Configuration conf) {
 +    String authString = conf.get(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS));
-     return authString == null ? Authorizations.EMPTY : new Authorizations(authString.getBytes(StandardCharsets.UTF_8));
++    return authString == null ? Authorizations.EMPTY : new Authorizations(authString.getBytes(UTF_8));
 +  }
 +
 +  /**
 +   * Sets the input ranges to scan on all input tables for this job. If not set, the entire table will be scanned.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param ranges
 +   *          the ranges that will be mapped over
 +   * @throws IllegalArgumentException
 +   *           if the ranges cannot be encoded into base 64
 +   * @since 1.6.0
 +   */
 +  public static void setRanges(Class<?> implementingClass, Configuration conf, Collection<Range> ranges) {
 +    checkArgument(ranges != null, "ranges is null");
 +
 +    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
 +    try {
 +      for (Range r : ranges) {
 +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        r.write(new DataOutputStream(baos));
 +        rangeStrings.add(Base64.encodeBase64String(baos.toByteArray()));
 +      }
 +      conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
 +    } catch (IOException ex) {
 +      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
 +    }
 +  }
 +
 +  /**
 +   * Gets the ranges to scan over from a job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the ranges
 +   * @throws IOException
 +   *           if the ranges have been encoded improperly
 +   * @since 1.6.0
 +   * @see #setRanges(Class, Configuration, Collection)
 +   */
 +  public static List<Range> getRanges(Class<?> implementingClass, Configuration conf) throws IOException {
 +
 +    Collection<String> encodedRanges = conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES));
 +    List<Range> ranges = new ArrayList<Range>();
 +    for (String rangeString : encodedRanges) {
-       ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(StandardCharsets.UTF_8)));
++      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(UTF_8)));
 +      Range range = new Range();
 +      range.readFields(new DataInputStream(bais));
 +      ranges.add(range);
 +    }
 +    return ranges;
 +  }
 +
 +  /**
 +   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return a list of iterators
 +   * @since 1.6.0
 +   * @see #addIterator(Class, Configuration, IteratorSetting)
 +   */
 +  public static List<IteratorSetting> getIterators(Class<?> implementingClass, Configuration conf) {
 +    String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
 +
 +    // If no iterators are present, return an empty list
 +    if (iterators == null || iterators.isEmpty())
 +      return new ArrayList<IteratorSetting>();
 +
 +    // Compose the set of iterators encoded in the job configuration
 +    StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR);
 +    List<IteratorSetting> list = new ArrayList<IteratorSetting>();
 +    try {
 +      while (tokens.hasMoreTokens()) {
 +        String itstring = tokens.nextToken();
-         ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes(StandardCharsets.UTF_8)));
++        ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes(UTF_8)));
 +        list.add(new IteratorSetting(new DataInputStream(bais)));
 +        bais.close();
 +      }
 +    } catch (IOException e) {
 +      throw new IllegalArgumentException("couldn't decode iterator settings");
 +    }
 +    return list;
 +  }
 +
 +  /**
 +   * Restricts the columns that will be mapped over for the single input table on this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param columnFamilyColumnQualifierPairs
 +   *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
 +   *          selected. An empty set is the default and is equivalent to scanning the all columns.
 +   * @throws IllegalArgumentException
 +   *           if the column family is null
 +   * @since 1.6.0
 +   */
 +  public static void fetchColumns(Class<?> implementingClass, Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
 +    checkArgument(columnFamilyColumnQualifierPairs != null, "columnFamilyColumnQualifierPairs is null");
 +    String[] columnStrings = serializeColumns(columnFamilyColumnQualifierPairs);
 +    conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings);
 +  }
 +
 +  public static String[] serializeColumns(Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
 +    checkArgument(columnFamilyColumnQualifierPairs != null, "columnFamilyColumnQualifierPairs is null");
 +    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
 +    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
 +
 +      if (column.getFirst() == null)
 +        throw new IllegalArgumentException("Column family can not be null");
 +
 +      String col = Base64.encodeBase64String(TextUtil.getBytes(column.getFirst()));
 +      if (column.getSecond() != null)
 +        col += ":" + Base64.encodeBase64String(TextUtil.getBytes(column.getSecond()));
 +      columnStrings.add(col);
 +    }
 +
 +    return columnStrings.toArray(new String[0]);
 +  }
 +
 +  /**
 +   * Gets the columns to be mapped over from this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return a set of columns
 +   * @since 1.6.0
 +   * @see #fetchColumns(Class, Configuration, Collection)
 +   */
 +  public static Set<Pair<Text,Text>> getFetchedColumns(Class<?> implementingClass, Configuration conf) {
 +    checkArgument(conf != null, "conf is null");
 +    String confValue = conf.get(enumToConfKey(implementingClass, ScanOpts.COLUMNS));
 +    List<String> serialized = new ArrayList<String>();
 +    if (confValue != null) {
 +      // Split and include any trailing empty strings to allow empty column families
 +      for (String val : confValue.split(",", -1)) {
 +        serialized.add(val);
 +      }
 +    }
 +    return deserializeFetchedColumns(serialized);
 +  }
 +
 +  public static Set<Pair<Text,Text>> deserializeFetchedColumns(Collection<String> serialized) {
 +    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
 +
 +    if (null == serialized) {
 +      return columns;
 +    }
 +
 +    for (String col : serialized) {
 +      int idx = col.indexOf(":");
-       Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(StandardCharsets.UTF_8)) : Base64.decodeBase64(col.substring(0, idx).getBytes(
-           StandardCharsets.UTF_8)));
-       Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes(StandardCharsets.UTF_8)));
++      Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(UTF_8)) : Base64.decodeBase64(col.substring(0, idx).getBytes(
++          UTF_8)));
++      Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes(UTF_8)));
 +      columns.add(new Pair<Text,Text>(cf, cq));
 +    }
 +    return columns;
 +  }
 +
 +  /**
 +   * Encode an iterator on the input for the single input table associated with this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param cfg
 +   *          the configuration of the iterator
 +   * @throws IllegalArgumentException
 +   *           if the iterator can't be serialized into the configuration
 +   * @since 1.6.0
 +   */
 +  public static void addIterator(Class<?> implementingClass, Configuration conf, IteratorSetting cfg) {
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    String newIter;
 +    try {
 +      cfg.write(new DataOutputStream(baos));
 +      newIter = Base64.encodeBase64String(baos.toByteArray());
 +      baos.close();
 +    } catch (IOException e) {
 +      throw new IllegalArgumentException("unable to serialize IteratorSetting");
 +    }
 +
 +    String confKey = enumToConfKey(implementingClass, ScanOpts.ITERATORS);
 +    String iterators = conf.get(confKey);
 +    // No iterators specified yet, create a new string
 +    if (iterators == null || iterators.isEmpty()) {
 +      iterators = newIter;
 +    } else {
 +      // append the next iterator & reset
 +      iterators = iterators.concat(StringUtils.COMMA_STR + newIter);
 +    }
 +    // Store the iterators w/ the job
 +    conf.set(confKey, iterators);
 +  }
 +
 +  /**
 +   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
 +   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
 +   * 
 +   * <p>
 +   * By default, this feature is <b>enabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @see #setRanges(Class, Configuration, Collection)
 +   * @since 1.6.0
 +   */
 +  public static void setAutoAdjustRanges(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration has auto-adjust ranges enabled.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return false if the feature is disabled, true otherwise
 +   * @since 1.6.0
 +   * @see #setAutoAdjustRanges(Class, Configuration, boolean)
 +   */
 +  public static Boolean getAutoAdjustRanges(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true);
 +  }
 +
 +  /**
 +   * Controls the use of the {@link IsolatedScanner} in this job.
 +   * 
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.6.0
 +   */
 +  public static void setScanIsolation(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration has isolation enabled.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.6.0
 +   * @see #setScanIsolation(Class, Configuration, boolean)
 +   */
 +  public static Boolean isIsolated(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false);
 +  }
 +
 +  /**
 +   * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
 +   * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
 +   * 
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.6.0
 +   */
 +  public static void setLocalIterators(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration uses local iterators.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.6.0
 +   * @see #setLocalIterators(Class, Configuration, boolean)
 +   */
 +  public static Boolean usesLocalIterators(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false);
 +  }
 +
 +  /**
 +   * <p>
 +   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
 +   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
 +   * fail.
 +   * 
 +   * <p>
 +   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
 +   * 
 +   * <p>
 +   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
 +   * on the mapper's classpath.
 +   * 
 +   * <p>
 +   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
 +   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
 +   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
 +   * 
 +   * <p>
 +   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
 +   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
 +   * 
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.6.0
 +   */
 +  public static void setOfflineTableScan(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration has the offline table scan feature enabled.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.6.0
 +   * @see #setOfflineTableScan(Class, Configuration, boolean)
 +   */
 +  public static Boolean isOfflineScan(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false);
 +  }
 +
 +  /**
 +   * Sets configurations for multiple tables at a time.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param configs
 +   *          an array of {@link InputTableConfig} objects to associate with the job
 +   * @since 1.6.0
 +   */
 +  public static void setInputTableConfigs(Class<?> implementingClass, Configuration conf, Map<String,InputTableConfig> configs) {
 +    MapWritable mapWritable = new MapWritable();
 +    for (Map.Entry<String,InputTableConfig> tableConfig : configs.entrySet())
 +      mapWritable.put(new Text(tableConfig.getKey()), tableConfig.getValue());
 +
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    try {
 +      mapWritable.write(new DataOutputStream(baos));
 +    } catch (IOException e) {
 +      throw new IllegalStateException("Table configuration could not be serialized.");
 +    }
 +
 +    String confKey = enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS);
 +    conf.set(confKey, Base64.encodeBase64String(baos.toByteArray()));
 +  }
 +
 +  /**
 +   * Returns all {@link InputTableConfig} objects associated with this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return all of the table query configs for the job
 +   * @since 1.6.0
 +   */
 +  public static Map<String,InputTableConfig> getInputTableConfigs(Class<?> implementingClass, Configuration conf) {
 +    Map<String,InputTableConfig> configs = new HashMap<String,InputTableConfig>();
 +    Map.Entry<String,InputTableConfig> defaultConfig = getDefaultInputTableConfig(implementingClass, conf);
 +    if (defaultConfig != null)
 +      configs.put(defaultConfig.getKey(), defaultConfig.getValue());
 +    String configString = conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS));
 +    MapWritable mapWritable = new MapWritable();
 +    if (configString != null) {
 +      try {
-         byte[] bytes = Base64.decodeBase64(configString.getBytes(StandardCharsets.UTF_8));
++        byte[] bytes = Base64.decodeBase64(configString.getBytes(UTF_8));
 +        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
 +        mapWritable.readFields(new DataInputStream(bais));
 +        bais.close();
 +      } catch (IOException e) {
 +        throw new IllegalStateException("The table query configurations could not be deserialized from the given configuration");
 +      }
 +    }
 +    for (Map.Entry<Writable,Writable> entry : mapWritable.entrySet())
 +      configs.put(((Text) entry.getKey()).toString(), (InputTableConfig) entry.getValue());
 +
 +    return configs;
 +  }
 +
 +  /**
 +   * Returns the {@link InputTableConfig} for the given table
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param tableName
 +   *          the table name for which to fetch the table query config
 +   * @return the table query config for the given table name (if it exists) and null if it does not
 +   * @since 1.6.0
 +   */
 +  public static InputTableConfig getInputTableConfig(Class<?> implementingClass, Configuration conf, String tableName) {
 +    Map<String,InputTableConfig> queryConfigs = getInputTableConfigs(implementingClass, conf);
 +    return queryConfigs.get(tableName);
 +  }
 +
 +  /**
 +   * Initializes an Accumulo {@link TabletLocator} based on the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param tableId
 +   *          The table id for which to initialize the {@link TabletLocator}
 +   * @return an Accumulo tablet locator
 +   * @throws TableNotFoundException
 +   *           if the table name set on the configuration doesn't exist
 +   * @since 1.6.0
 +   */
 +  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf, String tableId) throws TableNotFoundException {
 +    String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
 +    if ("MockInstance".equals(instanceType))
 +      return new MockTabletLocator();
 +    Instance instance = getInstance(implementingClass, conf);
 +    return TabletLocator.getLocator(instance, new Text(tableId));
 +  }
 +
 +  // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
 +  /**
 +   * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @throws IOException
 +   *           if the context is improperly configured
 +   * @since 1.6.0
 +   */
 +  public static void validateOptions(Class<?> implementingClass, Configuration conf) throws IOException {
 +
 +    Map<String,InputTableConfig> inputTableConfigs = getInputTableConfigs(implementingClass, conf);
 +    if (!isConnectorInfoSet(implementingClass, conf))
 +      throw new IOException("Input info has not been set.");
 +    String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
 +    if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey))
 +      throw new IOException("Instance info has not been set.");
 +    // validate that we can connect as configured
 +    try {
 +      String principal = getPrincipal(implementingClass, conf);
 +      AuthenticationToken token = getAuthenticationToken(implementingClass, conf);
 +      Connector c = getInstance(implementingClass, conf).getConnector(principal, token);
 +      if (!c.securityOperations().authenticateUser(principal, token))
 +        throw new IOException("Unable to authenticate user");
 +
 +      if (getInputTableConfigs(implementingClass, conf).size() == 0)
 +        throw new IOException("No table set.");
 +
 +      for (Map.Entry<String,InputTableConfig> tableConfig : inputTableConfigs.entrySet()) {
 +        if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), tableConfig.getKey(), TablePermission.READ))
 +          throw new IOException("Unable to access table");
 +      }
 +      for (Map.Entry<String,InputTableConfig> tableConfigEntry : inputTableConfigs.entrySet()) {
 +        InputTableConfig tableConfig = tableConfigEntry.getValue();
 +        if (!tableConfig.shouldUseLocalIterators()) {
 +          if (tableConfig.getIterators() != null) {
 +            for (IteratorSetting iter : tableConfig.getIterators()) {
 +              if (!c.tableOperations().testClassLoad(tableConfigEntry.getKey(), iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
 +                throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
 +            }
 +          }
 +        }
 +      }
 +    } catch (AccumuloException e) {
 +      throw new IOException(e);
 +    } catch (AccumuloSecurityException e) {
 +      throw new IOException(e);
 +    } catch (TableNotFoundException e) {
 +      throw new IOException(e);
 +    }
 +  }
 +
 +  /**
 +   * Returns the {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} for the configuration based on the properties set using the single-table
 +   * input methods.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop instance for which to retrieve the configuration
 +   * @return the config object built from the single input table properties set on the job
 +   * @since 1.6.0
 +   */
 +  protected static Map.Entry<String,InputTableConfig> getDefaultInputTableConfig(Class<?> implementingClass, Configuration conf) {
 +    String tableName = getInputTableName(implementingClass, conf);
 +    if (tableName != null) {
 +      InputTableConfig queryConfig = new InputTableConfig();
 +      List<IteratorSetting> itrs = getIterators(implementingClass, conf);
 +      if (itrs != null)
 +        queryConfig.setIterators(itrs);
 +      Set<Pair<Text,Text>> columns = getFetchedColumns(implementingClass, conf);
 +      if (columns != null)
 +        queryConfig.fetchColumns(columns);
 +      List<Range> ranges = null;
 +      try {
 +        ranges = getRanges(implementingClass, conf);
 +      } catch (IOException e) {
 +        throw new RuntimeException(e);
 +      }
 +      if (ranges != null)
 +        queryConfig.setRanges(ranges);
 +
 +      queryConfig.setAutoAdjustRanges(getAutoAdjustRanges(implementingClass, conf)).setUseIsolatedScanners(isIsolated(implementingClass, conf))
 +          .setUseLocalIterators(usesLocalIterators(implementingClass, conf)).setOfflineScan(isOfflineScan(implementingClass, conf));
 +      return Maps.immutableEntry(tableName, queryConfig);
 +    }
 +    return null;
 +  }
 +
 +  public static Map<String,Map<KeyExtent,List<Range>>> binOffline(String tableId, List<Range> ranges, Instance instance, Connector conn)
 +      throws AccumuloException, TableNotFoundException {
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 +
 +    if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
 +      Tables.clearCache(instance);
 +      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
 +        throw new AccumuloException("Table is online tableId:" + tableId + " cannot scan table in offline mode ");
 +      }
 +    }
 +
 +    for (Range range : ranges) {
 +      Text startRow;
 +
 +      if (range.getStartKey() != null)
 +        startRow = range.getStartKey().getRow();
 +      else
 +        startRow = new Text();
 +
 +      Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
 +      Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +      MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
 +      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
 +      scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
 +      scanner.setRange(metadataRange);
 +
 +      RowIterator rowIter = new RowIterator(scanner);
 +      KeyExtent lastExtent = null;
 +      while (rowIter.hasNext()) {
 +        Iterator<Map.Entry<Key,Value>> row = rowIter.next();
 +        String last = "";
 +        KeyExtent extent = null;
 +        String location = null;
 +
 +        while (row.hasNext()) {
 +          Map.Entry<Key,Value> entry = row.next();
 +          Key key = entry.getKey();
 +
 +          if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
 +            last = entry.getValue().toString();
 +          }
 +
 +          if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
 +              || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
 +            location = entry.getValue().toString();
 +          }
 +
 +          if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
 +            extent = new KeyExtent(key.getRow(), entry.getValue());
 +          }
 +
 +        }
 +
 +        if (location != null)
 +          return null;
 +
 +        if (!extent.getTableId().toString().equals(tableId)) {
 +          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
 +        }
 +
 +        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
 +          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
 +        }
 +
 +        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
 +        if (tabletRanges == null) {
 +          tabletRanges = new HashMap<KeyExtent,List<Range>>();
 +          binnedRanges.put(last, tabletRanges);
 +        }
 +
 +        List<Range> rangeList = tabletRanges.get(extent);
 +        if (rangeList == null) {
 +          rangeList = new ArrayList<Range>();
 +          tabletRanges.put(extent, rangeList);
 +        }
 +
 +        rangeList.add(range);
 +
 +        if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
 +          break;
 +        }
 +
 +        lastExtent = extent;
 +      }
 +
 +    }
 +    return binnedRanges;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/OutputConfigurator.java
----------------------------------------------------------------------
diff --cc mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/OutputConfigurator.java
index 727971a,0000000..13b67d5
mode 100644,000000..100644
--- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/OutputConfigurator.java
+++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/OutputConfigurator.java
@@@ -1,204 -1,0 +1,204 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce.lib.impl;
 +
++import static java.nio.charset.StandardCharsets.UTF_8;
++
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- 
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.hadoop.conf.Configuration;
 +
 +/**
 + * @since 1.6.0
 + */
 +public class OutputConfigurator extends ConfiguratorBase {
 +
 +  /**
 +   * Configuration keys for {@link BatchWriter}.
 +   * 
 +   * @since 1.6.0
 +   */
 +  public static enum WriteOpts {
 +    DEFAULT_TABLE_NAME, BATCH_WRITER_CONFIG
 +  }
 +
 +  /**
 +   * Configuration keys for various features.
 +   * 
 +   * @since 1.6.0
 +   */
 +  public static enum Features {
 +    CAN_CREATE_TABLES, SIMULATION_MODE
 +  }
 +
 +  /**
 +   * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and
 +   * underscores.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param tableName
 +   *          the table to use when the tablename is null in the write call
 +   * @since 1.6.0
 +   */
 +  public static void setDefaultTableName(Class<?> implementingClass, Configuration conf, String tableName) {
 +    if (tableName != null)
 +      conf.set(enumToConfKey(implementingClass, WriteOpts.DEFAULT_TABLE_NAME), tableName);
 +  }
 +
 +  /**
 +   * Gets the default table name from the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the default table name
 +   * @since 1.6.0
 +   * @see #setDefaultTableName(Class, Configuration, String)
 +   */
 +  public static String getDefaultTableName(Class<?> implementingClass, Configuration conf) {
 +    return conf.get(enumToConfKey(implementingClass, WriteOpts.DEFAULT_TABLE_NAME));
 +  }
 +
 +  /**
 +   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
 +   * used. Setting the configuration multiple times overwrites any previous configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param bwConfig
 +   *          the configuration for the {@link BatchWriter}
 +   * @since 1.6.0
 +   */
 +  public static void setBatchWriterOptions(Class<?> implementingClass, Configuration conf, BatchWriterConfig bwConfig) {
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    String serialized;
 +    try {
 +      bwConfig.write(new DataOutputStream(baos));
-       serialized = new String(baos.toByteArray(), StandardCharsets.UTF_8);
++      serialized = new String(baos.toByteArray(), UTF_8);
 +      baos.close();
 +    } catch (IOException e) {
 +      throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName());
 +    }
 +    conf.set(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG), serialized);
 +  }
 +
 +  /**
 +   * Gets the {@link BatchWriterConfig} settings.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the configuration object
 +   * @since 1.6.0
 +   * @see #setBatchWriterOptions(Class, Configuration, BatchWriterConfig)
 +   */
 +  public static BatchWriterConfig getBatchWriterOptions(Class<?> implementingClass, Configuration conf) {
 +    String serialized = conf.get(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG));
 +    BatchWriterConfig bwConfig = new BatchWriterConfig();
 +    if (serialized == null || serialized.isEmpty()) {
 +      return bwConfig;
 +    } else {
 +      try {
-         ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(StandardCharsets.UTF_8));
++        ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(UTF_8));
 +        bwConfig.readFields(new DataInputStream(bais));
 +        bais.close();
 +        return bwConfig;
 +      } catch (IOException e) {
 +        throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName());
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores.
 +   * 
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.6.0
 +   */
 +  public static void setCreateTables(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.CAN_CREATE_TABLES), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether tables are permitted to be created as needed.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is disabled, false otherwise
 +   * @since 1.6.0
 +   * @see #setCreateTables(Class, Configuration, boolean)
 +   */
 +  public static Boolean canCreateTables(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.CAN_CREATE_TABLES), false);
 +  }
 +
 +  /**
 +   * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing.
 +   * 
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.6.0
 +   */
 +  public static void setSimulationMode(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.SIMULATION_MODE), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether this feature is enabled.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.6.0
 +   * @see #setSimulationMode(Class, Configuration, boolean)
 +   */
 +  public static Boolean getSimulationMode(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.SIMULATION_MODE), false);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
----------------------------------------------------------------------
diff --cc mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
index 2950091,0000000..c0c0097
mode 100644,000000..100644
--- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
+++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
@@@ -1,135 -1,0 +1,136 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce.lib.partition;
 +
++import static java.nio.charset.StandardCharsets.UTF_8;
++
 +import java.io.BufferedReader;
 +import java.io.FileInputStream;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.net.URI;
- import java.nio.charset.StandardCharsets;
 +import java.util.Arrays;
 +import java.util.Scanner;
 +import java.util.TreeSet;
 +
 +import org.apache.accumulo.core.client.mapreduce.lib.impl.DistributedCacheHelper;
 +import org.apache.accumulo.core.util.Base64;
 +import org.apache.hadoop.conf.Configurable;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Partitioner;
 +
 +/**
 + * Hadoop partitioner that uses ranges, and optionally sub-bins based on hashing.
 + */
 +public class RangePartitioner extends Partitioner<Text,Writable> implements Configurable {
 +  private static final String PREFIX = RangePartitioner.class.getName();
 +  private static final String CUTFILE_KEY = PREFIX + ".cutFile";
 +  private static final String NUM_SUBBINS = PREFIX + ".subBins";
 +
 +  private Configuration conf;
 +
 +  @Override
 +  public int getPartition(Text key, Writable value, int numPartitions) {
 +    try {
 +      return findPartition(key, getCutPoints(), getNumSubBins());
 +    } catch (IOException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  int findPartition(Text key, Text[] array, int numSubBins) {
 +    // find the bin for the range, and guarantee it is positive
 +    int index = Arrays.binarySearch(array, key);
 +    index = index < 0 ? (index + 1) * -1 : index;
 +
 +    // both conditions work with numSubBins == 1, but this check is to avoid
 +    // hashing, when we don't need to, for speed
 +    if (numSubBins < 2)
 +      return index;
 +    return (key.toString().hashCode() & Integer.MAX_VALUE) % numSubBins + index * numSubBins;
 +  }
 +
 +  private int _numSubBins = 0;
 +
 +  private synchronized int getNumSubBins() {
 +    if (_numSubBins < 1) {
 +      // get number of sub-bins and guarantee it is positive
 +      _numSubBins = Math.max(1, getConf().getInt(NUM_SUBBINS, 1));
 +    }
 +    return _numSubBins;
 +  }
 +
 +  private Text cutPointArray[] = null;
 +
 +  private synchronized Text[] getCutPoints() throws IOException {
 +    if (cutPointArray == null) {
 +      String cutFileName = conf.get(CUTFILE_KEY);
 +      Path[] cf = DistributedCacheHelper.getLocalCacheFiles(conf);
 +
 +      if (cf != null) {
 +        for (Path path : cf) {
 +          if (path.toUri().getPath().endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) {
 +            TreeSet<Text> cutPoints = new TreeSet<Text>();
-             Scanner in = new Scanner(new BufferedReader(new InputStreamReader(new FileInputStream(path.toString()), StandardCharsets.UTF_8)));
++            Scanner in = new Scanner(new BufferedReader(new InputStreamReader(new FileInputStream(path.toString()), UTF_8)));
 +            try {
 +              while (in.hasNextLine())
-                 cutPoints.add(new Text(Base64.decodeBase64(in.nextLine().getBytes(StandardCharsets.UTF_8))));
++                cutPoints.add(new Text(Base64.decodeBase64(in.nextLine().getBytes(UTF_8))));
 +            } finally {
 +              in.close();
 +            }
 +            cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
 +            break;
 +          }
 +        }
 +      }
 +      if (cutPointArray == null)
 +        throw new FileNotFoundException(cutFileName + " not found in distributed cache");
 +    }
 +    return cutPointArray;
 +  }
 +
 +  @Override
 +  public Configuration getConf() {
 +    return conf;
 +  }
 +
 +  @Override
 +  public void setConf(Configuration conf) {
 +    this.conf = conf;
 +  }
 +
 +  /**
 +   * Sets the hdfs file name to use, containing a newline separated list of Base64 encoded split points that represent ranges for partitioning
 +   */
 +  public static void setSplitFile(Job job, String file) {
 +    URI uri = new Path(file).toUri();
 +    DistributedCacheHelper.addCacheFile(uri, job.getConfiguration());
 +    job.getConfiguration().set(CUTFILE_KEY, uri.getPath());
 +  }
 +
 +  /**
 +   * Sets the number of random sub-bins per range
 +   */
 +  public static void setNumSubBins(Job job, int num) {
 +    job.getConfiguration().setInt(NUM_SUBBINS, num);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --cc minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index 0febb58,8216441..aa10276
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@@ -16,6 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.minicluster.impl;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.BufferedReader;
  import java.io.BufferedWriter;
  import java.io.File;
@@@ -105,8 -108,6 +107,7 @@@ import org.apache.zookeeper.server.ZooK
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import com.google.common.base.Charsets;
 +import com.google.common.base.Joiner;
  import com.google.common.base.Predicate;
  import com.google.common.collect.Maps;
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
----------------------------------------------------------------------
diff --cc proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index a778add,30da0cba..bd0782d
--- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@@ -16,8 -16,9 +16,9 @@@
   */
  package org.apache.accumulo.proxy;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.nio.ByteBuffer;
- import java.nio.charset.StandardCharsets;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/proxy/src/main/java/org/apache/accumulo/proxy/TestProxyClient.java
----------------------------------------------------------------------
diff --cc proxy/src/main/java/org/apache/accumulo/proxy/TestProxyClient.java
index 5f84c2f,ec1c9d2..4850c19
--- a/proxy/src/main/java/org/apache/accumulo/proxy/TestProxyClient.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/TestProxyClient.java
@@@ -16,8 -16,9 +16,9 @@@
   */
  package org.apache.accumulo.proxy;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.nio.ByteBuffer;
- import java.nio.charset.StandardCharsets;
  import java.util.Collections;
  import java.util.Date;
  import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/proxy/src/main/java/org/apache/accumulo/proxy/Util.java
----------------------------------------------------------------------
diff --cc proxy/src/main/java/org/apache/accumulo/proxy/Util.java
index 94b0709,7333539..bc0db72
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Util.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Util.java
@@@ -16,9 -16,10 +16,10 @@@
   */
  package org.apache.accumulo.proxy;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.math.BigInteger;
  import java.nio.ByteBuffer;
- import java.nio.charset.StandardCharsets;
  import java.util.Random;
  
  import org.apache.accumulo.proxy.thrift.IteratorSetting;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index e4d6264,6c40094..5c93a53
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -16,6 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.File;
  import java.io.FileInputStream;
  import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index 7169215,05f3c6c..efbd882
--- a/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@@ -16,9 -16,10 +16,10 @@@
   */
  package org.apache.accumulo.server.client;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.IOException;
  import java.nio.ByteBuffer;
- import java.nio.charset.StandardCharsets;
  import java.util.Collections;
  import java.util.List;
  import java.util.UUID;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessor.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessor.java
index a154fdc,18feb4f..2ad1c3d
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessor.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.conf;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.util.List;
  import java.util.Map;
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
index 0e9ee95,6d25f1d..c472581
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.conf;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.util.Collections;
  import java.util.HashMap;
  import java.util.List;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index 9d56265,587e38a..e4f61f9
--- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.constraints;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 1fd9aa3,7c4b8d3..45e883d
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@@ -16,15 -16,14 +16,15 @@@
   */
  package org.apache.accumulo.server.init;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
  import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
  import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
  import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
  
  import java.io.FileNotFoundException;
  import java.io.IOException;
- import java.nio.charset.StandardCharsets;
  import java.util.Arrays;
 +import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Locale;
@@@ -412,8 -404,7 +412,8 @@@ public class Initialize 
      zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
      zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
      zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
-     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(StandardCharsets.UTF_8),
 -    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(UTF_8), NodeExistsPolicy.FAIL);
++    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(UTF_8),
 +        NodeExistsPolicy.FAIL);
      zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
      zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
      zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index 4a66db7,002389a..e577153
--- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@@ -16,6 -16,7 +16,7 @@@
   */
  package org.apache.accumulo.server.master;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
  import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
  
  import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
index 0e1f5ee,568e6f6..3f7f167
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.master.state;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.util.ArrayList;
  import java.util.List;
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
index 8a09e75,b4ef519..f473ba3
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@@ -16,12 -16,13 +16,12 @@@
   */
  package org.apache.accumulo.server.master.state;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.IOException;
  import java.io.ObjectInputStream;
  import java.io.ObjectOutputStream;
  import java.io.Serializable;
- import java.nio.charset.StandardCharsets;
--
  import org.apache.accumulo.core.data.Mutation;
  import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
@@@ -127,8 -120,8 +127,8 @@@ public class TServerInstance implement
      return new Text(this.getSession());
    }
    
 -  public Value asMutationValue() {
 +  private Value asMutationValue() {
-     return new Value(getLocation().toString().getBytes(StandardCharsets.UTF_8));
+     return new Value(getLocation().toString().getBytes(UTF_8));
    }
    
    public HostAndPort getLocation() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
index e06b58e,91be9c3..2a84e70
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
@@@ -16,8 -16,9 +16,9 @@@
   */
  package org.apache.accumulo.server.master.state;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.IOException;
- import java.nio.charset.StandardCharsets;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
index 6ab4683,8f0c9c7..1bcf482
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
@@@ -16,8 -16,9 +16,9 @@@
   */
  package org.apache.accumulo.server.master.state;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.IOException;
- import java.nio.charset.StandardCharsets;
  import java.util.List;
  
  import org.apache.accumulo.core.zookeeper.ZooUtil;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index e7f06dc,15ade65..5481531
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@@ -16,8 -16,9 +16,9 @@@
   */
  package org.apache.accumulo.server.master.state;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.IOException;
- import java.nio.charset.StandardCharsets;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
index 05ede64,409fa62..54ca8de
--- a/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
@@@ -16,6 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.metrics;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.File;
  import java.io.FileOutputStream;
  import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
index 902c063,4e909c0..930b634
--- a/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
@@@ -16,6 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.monitor;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.IOException;
  import java.net.ServerSocket;
  import java.net.Socket;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index 590cbe4,0000000..8e9612a
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@@ -1,274 -1,0 +1,275 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.replication;
 +
- import java.nio.charset.StandardCharsets;
++import static java.nio.charset.StandardCharsets.UTF_8;
++
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.NoSuchElementException;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.client.replication.ReplicaSystem;
 +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.replication.ReplicationConstants;
 +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.core.replication.StatusUtil;
 +import org.apache.accumulo.core.replication.proto.Replication.Status;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.server.zookeeper.ZooCache;
 +import org.apache.hadoop.io.Text;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Maps;
 +import com.google.protobuf.InvalidProtocolBufferException;
 +
 +public class ReplicationUtil {
 +  private static final Logger log = LoggerFactory.getLogger(ReplicationUtil.class);
 +
 +  private static final String REPLICATION_TARGET_PREFIX = Property.TABLE_REPLICATION_TARGET.getKey();
 +
 +  private ZooCache zooCache;
 +
 +  public ReplicationUtil() {
 +    this(new ZooCache());
 +  }
 +
 +  public ReplicationUtil(ZooCache cache) {
 +    this.zooCache = cache;
 +  }
 +
 +  public int getMaxReplicationThreads(Map<String,String> systemProperties, MasterMonitorInfo mmi) {
 +    int activeTservers = mmi.getTServerInfoSize();
 +
 +    // The number of threads each tserver will use at most to replicate data
 +    int replicationThreadsPerServer = Integer.parseInt(systemProperties.get(Property.REPLICATION_WORKER_THREADS.getKey()));
 +
 +    // The total number of "slots" we have to replicate data
 +    return activeTservers * replicationThreadsPerServer;
 +  }
 +
 +  public int getMaxReplicationThreads(Connector conn, MasterMonitorInfo mmi) throws AccumuloException, AccumuloSecurityException {
 +    return getMaxReplicationThreads(conn.instanceOperations().getSystemConfiguration(), mmi);
 +  }
 +
 +  /**
 +   * Extract replication peers from system configuration
 +   * @param systemProperties System properties, typically from Connector.instanceOperations().getSystemConfiguration()
 +   * @return Configured replication peers
 +   */
 +  public Map<String,String> getPeers(Map<String,String> systemProperties) {
 +    Map<String,String> peers = new HashMap<>();
 +    String definedPeersPrefix = Property.REPLICATION_PEERS.getKey();
 +
 +    // Get the defined peers and what ReplicaSystem impl they're using
 +    for (Entry<String,String> property : systemProperties.entrySet()) {
 +      String key = property.getKey();
 +      // Filter out cruft that we don't want
 +      if (key.startsWith(definedPeersPrefix) && !key.startsWith(Property.REPLICATION_PEER_USER.getKey()) && !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) {
 +        String peerName = property.getKey().substring(definedPeersPrefix.length());
 +        ReplicaSystem replica;
 +        try {
 +         replica = ReplicaSystemFactory.get(property.getValue());
 +        } catch (Exception e) {
 +          log.warn("Could not instantiate ReplicaSystem for {} with configuration {}", property.getKey(), property.getValue(), e);
 +          continue;
 +        }
 +
 +        peers.put(peerName, replica.getClass().getName());
 +      }
 +    }
 +
 +    return peers;
 +  }
 +
 +  public Set<ReplicationTarget> getReplicationTargets(TableOperations tops) {
 +    // The total set of configured targets
 +    final Set<ReplicationTarget> allConfiguredTargets = new HashSet<>();
 +    final Map<String,String> tableNameToId = tops.tableIdMap();
 +
 +    for (String table : tops.list()) {
 +      if (MetadataTable.NAME.equals(table) || RootTable.NAME.equals(table)) {
 +        continue;
 +      }
 +      String localId = tableNameToId.get(table);
 +      if (null == localId) {
 +        log.trace("Could not determine ID for {}", table);
 +        continue;
 +      }
 +
 +      Iterable<Entry<String,String>> propertiesForTable;
 +      try {
 +        propertiesForTable = tops.getProperties(table);
 +      } catch (AccumuloException e) {
 +        log.debug("Could not fetch properties for " + table, e);
 +        continue;
 +      } catch (TableNotFoundException e) {
 +        log.debug("Could not fetch properties for " + table, e);
 +        continue;
 +      }
 +
 +      for (Entry<String,String> prop : propertiesForTable) {
 +        if (prop.getKey().startsWith(REPLICATION_TARGET_PREFIX)) {
 +          String peerName = prop.getKey().substring(REPLICATION_TARGET_PREFIX.length());
 +          String remoteIdentifier = prop.getValue();
 +          ReplicationTarget target = new ReplicationTarget(peerName, remoteIdentifier, localId);
 +
 +          allConfiguredTargets.add(target);
 +        }
 +      }
 +    }
 +
 +    return allConfiguredTargets;
 +  }
 +
 +  public Map<ReplicationTarget,Long> getPendingReplications(Connector conn) {
 +    final Map<ReplicationTarget,Long> counts = new HashMap<>();
 +
 +    // Read over the queued work
 +    BatchScanner bs;
 +    try {
 +      bs = conn.createBatchScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY, 4);
 +    } catch (TableNotFoundException e) {
 +      log.debug("No replication table exists", e);
 +      return counts;
 +    }
 +
 +    bs.setRanges(Collections.singleton(new Range()));
 +    WorkSection.limit(bs);
 +    try {
 +      Text buffer = new Text();
 +      for (Entry<Key,Value> entry : bs) {
 +        Key k = entry.getKey();
 +        k.getColumnQualifier(buffer);
 +        ReplicationTarget target = ReplicationTarget.from(buffer);
 +
 +        // TODO ACCUMULO-2835 once explicit lengths are tracked, we can give size-based estimates instead of just file-based
 +        Long count = counts.get(target);
 +        if (null == count) {
 +          counts.put(target, Long.valueOf(1l));
 +        } else {
 +          counts.put(target, count + 1);
 +        }
 +      }
 +    } finally {
 +      bs.close();
 +    }
 +
 +    return counts;
 +  }
 +
 +  /**
 +   * Fetches the absolute path of the file to be replicated.
 +   * @param conn Accumulo Connector
 +   * @param workQueuePath Root path for the Replication WorkQueue
 +   * @param queueKey The Replication work queue key
 +   * @return The absolute path for the file, or null if the key is no longer in ZooKeeper
 +   */
 +  public String getAbsolutePath(Connector conn, String workQueuePath, String queueKey) {
 +    byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
 +    if (null != data) {
-       return new String(data, StandardCharsets.UTF_8);
++      return new String(data, UTF_8);
 +    }
 +
 +    return null;
 +  }
 +
 +  /**
 +   * Compute a progress string for the replication of the given WAL
 +   * @param conn Accumulo Connector
 +   * @param path Absolute path to a WAL, or null
 +   * @param target ReplicationTarget the WAL is being replicated to
 +   * @return A status message for a file being replicated
 +   */
 +  public String getProgress(Connector conn, String path, ReplicationTarget target) {
 +    // We could try to grep over the table, but without knowing the full file path, we
 +    // can't find the status quickly
 +    String status = "Unknown";
 +    if (null != path) {
 +      Scanner s;
 +      try {
 +        s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY);
 +      } catch (TableNotFoundException e) {
 +        log.debug("Replication table no long exists", e);
 +        return status;
 +      }
 +
 +      s.setRange(Range.exact(path));
 +      s.fetchColumn(WorkSection.NAME, target.toText());
 +
 +      // Fetch the work entry for this item
 +      Entry<Key,Value> kv = null;
 +      try {
 +        kv = Iterables.getOnlyElement(s);
 +      } catch (NoSuchElementException e) {
 +       log.trace("Could not find status of {} replicating to {}", path, target);
 +       status = "Unknown";
 +      } finally {
 +        s.close();
 +      }
 +
 +      // If we found the work entry for it, try to compute some progress
 +      if (null != kv) {
 +        try {
 +          Status stat = Status.parseFrom(kv.getValue().get());
 +          if (StatusUtil.isFullyReplicated(stat)) {
 +            status = "Finished";
 +          } else {
 +            if (stat.getInfiniteEnd()) {
 +              status = stat.getBegin() + "/&infin; records";
 +            } else {
 +              status = stat.getBegin() + "/" + stat.getEnd() + " records";
 +            }
 +          }
 +        } catch (InvalidProtocolBufferException e) {
 +          log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
 +          status = "Unknown";
 +        }
 +      }
 +    }
 +
 +    return status;
 +  }
 +
 +  public Map<String,String> invert(Map<String,String> map) {
 +    Map<String,String> newMap = Maps.newHashMapWithExpectedSize(map.size());
 +    for(Entry<String,String> entry : map.entrySet()) {
 +      newMap.put(entry.getValue(), entry.getKey());
 +    }
 +    return newMap;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 08407e4,258080c..fa90b3e
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@@ -16,8 -16,9 +16,9 @@@
   */
  package org.apache.accumulo.server.security;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.nio.ByteBuffer;
- import java.nio.charset.StandardCharsets;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
index 7989447,872fe11..33a5158
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
@@@ -16,6 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.security;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.io.ByteArrayOutputStream;
  import java.io.DataOutputStream;
  import java.io.IOException;
@@@ -117,14 -118,14 +118,14 @@@ public final class SystemCredentials ex
        }
  
        // seed the config with the version and instance id, so at least it's not empty
-       md.update(ServerConstants.WIRE_VERSION.toString().getBytes(StandardCharsets.UTF_8));
+       md.update(ServerConstants.WIRE_VERSION.toString().getBytes(UTF_8));
        md.update(instanceIdBytes);
  
 -      for (Entry<String,String> entry : ServerConfiguration.getSiteConfiguration()) {
 +      for (Entry<String,String> entry : SiteConfiguration.getInstance()) {
          // only include instance properties
          if (entry.getKey().startsWith(Property.INSTANCE_PREFIX.toString())) {
-           md.update(entry.getKey().getBytes(StandardCharsets.UTF_8));
-           md.update(entry.getValue().getBytes(StandardCharsets.UTF_8));
+           md.update(entry.getKey().getBytes(UTF_8));
+           md.update(entry.getValue().getBytes(UTF_8));
          }
        }
        confChecksum = md.digest();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
index d46d623,fa1632c..afe21f2
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.security.handler;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.util.HashSet;
  import java.util.Set;
  import java.util.TreeSet;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
index bf22d1a,78597ad..0a1bd6c
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
@@@ -16,8 -16,9 +16,9 @@@
   */
  package org.apache.accumulo.server.security.handler;
  
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.nio.ByteBuffer;
- import java.nio.charset.StandardCharsets;
  import java.util.Collection;
  import java.util.Collections;
  import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
index 383c34e,41f880d..c03c40b
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.security.handler;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.util.Collections;
  import java.util.HashMap;
  import java.util.Map;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
index 7d27909,78f8c7b..886c28f
--- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.tables;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.security.SecurityPermission;
  import java.util.Collections;
  import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
index 38a96d4,e10c0c1..8bf4dc8
--- a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.tablets;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.util.Random;
  
  import org.apache.accumulo.core.Constants;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cfbdef13/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
index 7a77879,198b2d6..fb4cba0
--- a/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
@@@ -16,7 -16,8 +16,8 @@@
   */
  package org.apache.accumulo.server.util;
  
- import java.nio.charset.StandardCharsets;
 -import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
+ 
  import java.util.HashSet;
  import java.util.Map.Entry;
  import java.util.Set;


Mime
View raw message