accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [22/24] accumulo git commit: Merge branch '1.7' into 1.8
Date Tue, 25 Jul 2017 23:03:10 GMT
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/018c7fe5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/018c7fe5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/018c7fe5

Branch: refs/heads/1.8
Commit: 018c7fe55c1d2f0d960492a214d4eaf069ec60da
Parents: 0f061b9 465f6c4
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Tue Jul 25 18:25:16 2017 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Tue Jul 25 18:25:16 2017 -0400

----------------------------------------------------------------------
 .../accumulo/core/client/BatchScanner.java      |  4 +-
 .../accumulo/core/client/IsolatedScanner.java   |  4 +-
 .../accumulo/core/client/impl/MasterClient.java |  2 +-
 .../client/impl/NamespaceOperationsImpl.java    |  2 +-
 .../core/client/impl/RootTabletLocator.java     |  4 +-
 .../accumulo/core/client/impl/ServerClient.java |  3 +-
 .../core/client/impl/SyncingTabletLocator.java  |  2 +-
 .../core/client/impl/TableOperationsImpl.java   |  2 +-
 .../core/client/impl/TabletLocatorImpl.java     |  4 +-
 .../client/impl/TabletServerBatchWriter.java    |  2 +-
 .../core/client/impl/TimeoutTabletLocator.java  |  6 +--
 .../core/client/mapred/AbstractInputFormat.java |  4 +-
 .../client/mapreduce/AbstractInputFormat.java   |  4 +-
 .../core/client/mapreduce/RangeInputSplit.java  |  2 +-
 .../core/client/mock/MockTableOperations.java   |  3 +-
 .../java/org/apache/accumulo/core/data/Key.java | 15 +++---
 .../accumulo/core/file/rfile/bcfile/BCFile.java |  8 +--
 .../core/file/streams/PositionedOutputs.java    |  1 +
 .../file/streams/RateLimitedInputStream.java    |  1 +
 .../file/streams/RateLimitedOutputStream.java   |  1 +
 .../file/streams/SeekableDataInputStream.java   |  1 +
 .../iterators/system/LocalityGroupIterator.java |  3 +-
 .../system/SourceSwitchingIterator.java         |  3 +-
 .../core/iterators/user/CfCqSliceOpts.java      |  6 +--
 .../iterators/user/CfCqSliceSeekingFilter.java  |  6 +--
 .../iterators/user/RowEncodingIterator.java     |  2 +-
 .../core/iterators/user/SeekingFilter.java      | 10 ++--
 .../accumulo/core/trace/CountSampler.java       |  4 +-
 .../accumulo/core/trace/ProbabilitySampler.java |  4 +-
 .../org/apache/accumulo/core/trace/Span.java    |  4 +-
 .../apache/accumulo/core/util/CreateToken.java  |  4 +-
 .../accumulo/core/util/LocalityGroupUtil.java   |  4 +-
 .../core/util/format/BinaryFormatter.java       |  1 +
 .../core/util/format/DateStringFormatter.java   |  1 +
 .../core/util/format/DefaultFormatter.java      |  1 +
 .../ratelimit/SharedRateLimiterFactory.java     |  4 +-
 .../org/apache/accumulo/core/cli/TestHelp.java  |  3 +-
 .../accumulo/core/client/TestThrift1474.java    |  3 +-
 .../lexicoder/BigIntegerLexicoderTest.java      |  4 +-
 .../client/lexicoder/DateLexicoderTest.java     |  4 +-
 .../client/lexicoder/DoubleLexicoderTest.java   |  4 +-
 .../client/lexicoder/FloatLexicoderTest.java    |  4 +-
 .../client/lexicoder/IntegerLexicoderTest.java  |  4 +-
 .../core/client/lexicoder/LexicoderTest.java    |  9 ++--
 .../client/lexicoder/LongLexicoderTest.java     |  4 +-
 .../client/lexicoder/PairLexicoderTest.java     |  4 +-
 .../client/lexicoder/ReverseLexicoderTest.java  |  6 +--
 .../client/lexicoder/ULongLexicoderTest.java    |  4 +-
 .../client/lexicoder/UUIDLexicoderTest.java     |  4 +-
 .../apache/accumulo/core/data/RangeTest.java    |  4 +-
 .../core/file/rfile/MultiLevelIndexTest.java    |  3 +-
 .../core/file/rfile/MultiThreadedRFileTest.java |  8 +--
 .../core/file/streams/MockRateLimiter.java      |  1 +
 .../streams/RateLimitedInputStreamTest.java     |  1 +
 .../streams/RateLimitedOutputStreamTest.java    |  6 ++-
 .../ColumnFamilySkippingIteratorTest.java       |  4 +-
 .../core/iterators/system/ColumnFilterTest.java |  4 +-
 .../iterators/system/DeletingIteratorTest.java  |  4 +-
 .../system/SourceSwitchingIteratorTest.java     |  7 +--
 .../system/TimeSettingIteratorTest.java         |  8 +--
 .../iterators/system/VisibilityFilterTest.java  |  4 +-
 .../iterators/user/IndexedDocIteratorTest.java  |  4 +-
 .../core/iterators/user/LargeRowFilterTest.java |  4 +-
 .../user/WholeColumnFamilyIteratorTest.java     |  4 +-
 .../core/security/VisibilityConstraintTest.java |  2 +-
 .../apache/accumulo/core/util/TextUtilTest.java |  4 +-
 .../util/format/DateFormatSupplierTest.java     |  1 +
 .../util/format/DateStringFormatterTest.java    |  1 +
 .../core/util/format/DefaultFormatterTest.java  |  1 +
 .../core/util/format/FormatterConfigTest.java   |  1 +
 .../simple/filedata/ChunkCombinerTest.java      |  4 +-
 .../examples/simple/filedata/KeyUtilTest.java   |  4 +-
 .../accumulo/fate/util/AddressUtilTest.java     |  4 +-
 .../testcases/YieldingTestCase.java             |  6 +--
 .../impl/MiniAccumuloClusterControl.java        |  4 +-
 .../StandaloneClusterControlTest.java           |  2 +-
 pom.xml                                         | 51 +++++++++++++++++++-
 .../org/apache/accumulo/server/Accumulo.java    |  3 +-
 .../accumulo/server/client/HdfsZooInstance.java |  3 +-
 .../accumulo/server/fs/VolumeManager.java       |  2 +-
 .../accumulo/server/master/LiveTServerSet.java  |  2 +-
 .../balancer/HostRegexTableLoadBalancer.java    |  7 +--
 .../server/master/state/SuspendingTServer.java  |  7 ++-
 .../server/master/state/TabletStateStore.java   |  2 +-
 .../server/replication/StatusFormatter.java     |  1 +
 .../org/apache/accumulo/server/util/Halt.java   |  4 +-
 .../org/apache/accumulo/server/util/ZooZap.java |  2 +-
 .../server/master/LiveTServerSetTest.java       |  1 +
 .../security/handler/ZKAuthenticatorTest.java   |  4 +-
 .../org/apache/accumulo/master/MasterTime.java  |  8 +--
 .../DistributedWorkQueueWorkAssigner.java       |  3 +-
 .../replication/UnorderedWorkAssigner.java      |  4 +-
 .../accumulo/master/replication/WorkDriver.java |  4 +-
 .../accumulo/monitor/ZooKeeperStatus.java       |  2 +-
 .../accumulo/monitor/servlets/ShellServlet.java |  4 +-
 .../monitor/servlets/trace/NullScanner.java     |  2 +-
 .../apache/accumulo/tracer/TraceFormatter.java  |  1 +
 .../org/apache/accumulo/tracer/TraceServer.java |  3 +-
 .../apache/accumulo/tracer/TraceTableStats.java | 14 +++---
 .../org/apache/accumulo/tserver/MemKey.java     |  3 +-
 .../apache/accumulo/tserver/TabletServer.java   |  6 +--
 .../tserver/TabletServerResourceManager.java    |  2 +-
 .../apache/accumulo/shell/ShellCompletor.java   |  4 +-
 .../shell/commands/ClasspathCommand.java        |  4 +-
 .../accumulo/shell/commands/ConfigCommand.java  |  4 +-
 .../accumulo/shell/commands/GrepCommand.java    |  1 +
 .../accumulo/shell/commands/HistoryCommand.java |  4 +-
 .../accumulo/shell/commands/ScanCommand.java    |  1 +
 .../accumulo/shell/commands/TraceCommand.java   |  4 +-
 .../accumulo/shell/format/DeleterFormatter.java |  2 +-
 .../apache/accumulo/shell/mock/MockShell.java   |  4 +-
 .../apache/accumulo/shell/ShellConfigTest.java  |  4 +-
 .../accumulo/shell/ShellSetInstanceTest.java    |  4 +-
 .../shell/commands/CompactCommandTest.java      |  1 +
 .../shell/commands/DeleteAuthsCommandTest.java  |  4 +-
 .../shell/commands/DeleteManyCommandTest.java   |  1 +
 .../shell/commands/DeleteRowsCommandTest.java   |  1 +
 .../shell/commands/DropUserCommandTest.java     |  4 +-
 .../shell/commands/FlushCommandTest.java        |  1 +
 .../shell/commands/HistoryCommandTest.java      |  8 +--
 .../shell/commands/MergeCommandTest.java        |  1 +
 .../shell/commands/ScanCommandTest.java         |  1 +
 .../test/BadDeleteMarkersCreatedIT.java         |  4 +-
 .../test/BatchWriterInTabletServerIT.java       |  5 +-
 .../org/apache/accumulo/test/CleanWalIT.java    |  2 +-
 .../accumulo/test/CompactionRateLimitingIT.java |  1 +
 .../apache/accumulo/test/HardListIterator.java  | 14 +++---
 .../apache/accumulo/test/IMMLGBenchmark.java    |  3 +-
 .../apache/accumulo/test/MetaGetsReadersIT.java |  2 +-
 .../accumulo/test/MultiTableRecoveryIT.java     |  2 +-
 .../org/apache/accumulo/test/NamespacesIT.java  |  3 +-
 .../accumulo/test/NativeMapPerformanceTest.java |  3 +-
 .../accumulo/test/SplitCancelsMajCIT.java       |  3 +-
 .../apache/accumulo/test/SplitRecoveryIT.java   |  2 +-
 .../apache/accumulo/test/TableOperationsIT.java |  2 +-
 .../accumulo/test/TabletServerGivesUpIT.java    |  3 +-
 .../org/apache/accumulo/test/TotalQueuedIT.java |  2 +-
 .../test/TracerRecoversAfterOfflineTableIT.java |  3 +-
 .../apache/accumulo/test/YieldScannersIT.java   |  8 +--
 .../test/continuous/ContinuousBatchWalker.java  |  3 +-
 .../test/continuous/ContinuousScanner.java      |  2 +-
 .../accumulo/test/functional/AddSplitIT.java    |  3 +-
 .../test/functional/BadIteratorMincIT.java      |  2 +-
 .../test/functional/BatchScanSplitIT.java       |  3 +-
 .../functional/BulkSplitOptimizationIT.java     |  3 +-
 .../test/functional/CacheTestReader.java        |  3 +-
 .../test/functional/CacheTestWriter.java        |  3 +-
 .../accumulo/test/functional/ClassLoaderIT.java |  5 +-
 .../accumulo/test/functional/ConcurrencyIT.java |  2 +-
 .../accumulo/test/functional/ConstraintIT.java  |  3 +-
 .../test/functional/DeleteEverythingIT.java     |  2 +-
 .../test/functional/DeleteRowsSplitIT.java      |  3 +-
 .../test/functional/DynamicThreadPoolsIT.java   |  3 +-
 .../test/functional/GarbageCollectorIT.java     |  2 +-
 .../test/functional/HalfDeadTServerIT.java      |  3 +-
 .../accumulo/test/functional/LargeRowIT.java    |  3 +-
 .../accumulo/test/functional/MetadataIT.java    |  2 +-
 .../test/functional/MetadataMaxFilesIT.java     |  3 +-
 .../test/functional/MetadataSplitIT.java        |  3 +-
 .../accumulo/test/functional/ScanIdIT.java      |  3 +-
 .../test/functional/ScanIteratorIT.java         |  2 +-
 .../test/functional/ScanSessionTimeOutIT.java   |  3 +-
 .../test/functional/ServerSideErrorIT.java      |  4 +-
 .../accumulo/test/functional/ShutdownIT.java    |  3 +-
 .../test/functional/SlowConstraint.java         |  4 +-
 .../accumulo/test/functional/SlowIterator.java  |  4 +-
 .../test/functional/TableChangeStateIT.java     | 26 +++++-----
 .../accumulo/test/functional/TableIT.java       |  4 +-
 .../accumulo/test/functional/TimeoutIT.java     |  3 +-
 .../accumulo/test/functional/WALSunnyDayIT.java |  2 +-
 .../test/functional/YieldingIterator.java       | 11 ++---
 .../test/functional/ZookeeperRestartIT.java     |  3 +-
 .../CloseWriteAheadLogReferencesIT.java         |  3 +-
 .../accumulo/test/proxy/ProxyDurabilityIT.java  |  2 +-
 .../randomwalk/concurrent/OfflineTable.java     |  4 +-
 .../test/randomwalk/concurrent/Replication.java |  3 +-
 .../test/randomwalk/concurrent/Shutdown.java    |  4 +-
 .../test/randomwalk/concurrent/StartAll.java    |  4 +-
 .../test/randomwalk/shard/BulkInsert.java       |  3 +-
 .../test/replication/FinishedWorkUpdaterIT.java |  3 +-
 .../replication/MultiInstanceReplicationIT.java |  3 +-
 .../UnorderedWorkAssignerReplicationIT.java     |  3 +-
 .../accumulo/test/util/SerializationUtil.java   | 10 ++--
 .../scan/CollectTabletStatsTest.java            |  5 +-
 .../accumulo/trace/instrument/CountSampler.java |  4 +-
 185 files changed, 398 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
index 90e8637,1de833b..689bc6b
--- a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
@@@ -16,6 -16,6 +16,8 @@@
   */
  package org.apache.accumulo.core.client;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
  import java.util.ArrayList;
  import java.util.Iterator;
  import java.util.Map.Entry;
@@@ -28,10 -28,9 +30,8 @@@ import org.apache.accumulo.core.data.Ke
  import org.apache.accumulo.core.data.PartialKey;
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.data.Value;
 -import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.hadoop.io.Text;
  
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
  /**
   * A scanner that presents a row isolated view of an accumulo table. Rows are buffered in memory on the client side. If you think your rows may not fit into
   * memory, then you can provide an alternative row buffer factory to the constructor. This would allow rows to be buffered to disk for example.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index 73e7f10,32a71bc..a780846
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@@ -17,6 -17,6 +17,7 @@@
  package org.apache.accumulo.core.client.impl;
  
  import static com.google.common.base.Preconditions.checkArgument;
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  
  import java.net.UnknownHostException;
  import java.util.List;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java
index 0716122,c2cc04e..7bd97a6
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java
@@@ -51,8 -50,8 +51,8 @@@ import org.apache.accumulo.core.master.
  import org.apache.accumulo.core.master.thrift.MasterClientService;
  import org.apache.accumulo.core.trace.Tracer;
  import org.apache.accumulo.core.util.OpTimer;
- import org.slf4j.LoggerFactory;
 -import org.apache.log4j.Level;
 -import org.apache.log4j.Logger;
 +import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
  
  public class NamespaceOperationsImpl extends NamespaceOperationsHelper {
    private final ClientContext context;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java
index 9fdbb25,b7fd127..2bbe113
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java
@@@ -16,6 -16,6 +16,8 @@@
   */
  package org.apache.accumulo.core.client.impl;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
  import java.util.Collection;
  import java.util.Collections;
  import java.util.List;
@@@ -37,11 -37,9 +39,9 @@@ import org.apache.accumulo.core.zookeep
  import org.apache.accumulo.fate.zookeeper.ZooCache;
  import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
  import org.apache.hadoop.io.Text;
 -import org.apache.log4j.Level;
 -import org.apache.log4j.Logger;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
  public class RootTabletLocator extends TabletLocator {
  
    private final TabletServerLockChecker lockChecker;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
index 501b4df,2e406d2..84295c4
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
@@@ -17,6 -17,6 +17,7 @@@
  package org.apache.accumulo.core.client.impl;
  
  import static com.google.common.base.Preconditions.checkArgument;
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static java.nio.charset.StandardCharsets.UTF_8;
  
  import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java
index 6e7e072,0000000..9275a1e
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java
@@@ -1,115 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.Callable;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.TableNotFoundException;
- import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
++import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * Syncs itself with the static collection of TabletLocators, so that when the server clears it, it will automatically get the most up-to-date version. Caching
 + * TabletLocators locally is safe when using SyncingTabletLocator.
 + */
 +public class SyncingTabletLocator extends TabletLocator {
 +  private static final Logger log = Logger.getLogger(SyncingTabletLocator.class);
 +
 +  private volatile TabletLocator locator;
 +  private final Callable<TabletLocator> getLocatorFunction;
 +
 +  public SyncingTabletLocator(Callable<TabletLocator> getLocatorFunction) {
 +    this.getLocatorFunction = getLocatorFunction;
 +    try {
 +      this.locator = getLocatorFunction.call();
 +    } catch (Exception e) {
 +      log.error("Problem obtaining TabletLocator", e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  public SyncingTabletLocator(final ClientContext context, final String tableId) {
 +    this(new Callable<TabletLocator>() {
 +      @Override
 +      public TabletLocator call() throws Exception {
 +        return TabletLocator.getLocator(context, tableId);
 +      }
 +    });
 +  }
 +
 +  private TabletLocator syncLocator() {
 +    TabletLocator loc = this.locator;
 +    if (!loc.isValid())
 +      synchronized (this) {
 +        if (locator == loc)
 +          try {
 +            loc = locator = getLocatorFunction.call();
 +          } catch (Exception e) {
 +            log.error("Problem obtaining TabletLocator", e);
 +            throw new RuntimeException(e);
 +          }
 +      }
 +    return loc;
 +  }
 +
 +  @Override
 +  public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException,
 +      TableNotFoundException {
 +    return syncLocator().locateTablet(context, row, skipRow, retry);
 +  }
 +
 +  @Override
 +  public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    syncLocator().binMutations(context, mutations, binnedMutations, failures);
 +  }
 +
 +  @Override
 +  public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
 +      AccumuloSecurityException, TableNotFoundException {
 +    return syncLocator().binRanges(context, ranges, binnedRanges);
 +  }
 +
 +  @Override
 +  public void invalidateCache(KeyExtent failedExtent) {
 +    syncLocator().invalidateCache(failedExtent);
 +  }
 +
 +  @Override
 +  public void invalidateCache(Collection<KeyExtent> keySet) {
 +    syncLocator().invalidateCache(keySet);
 +  }
 +
 +  @Override
 +  public void invalidateCache() {
 +    syncLocator().invalidateCache();
 +  }
 +
 +  @Override
 +  public void invalidateCache(Instance instance, String server) {
 +    syncLocator().invalidateCache(instance, server);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index 8645dd2,63b44d5..dfbb2cc
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@@ -77,7 -73,6 +77,7 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.impl.thrift.TDiskUsage;
  import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
  import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
++import org.apache.accumulo.core.client.sample.SamplerConfiguration;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.ConfigurationCopy;
  import org.apache.accumulo.core.conf.Property;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
index 5932fda,63f3eb5..1fa71a2
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
@@@ -16,6 -16,6 +16,8 @@@
   */
  package org.apache.accumulo.core.client.impl;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
  import java.io.Serializable;
  import java.util.ArrayList;
  import java.util.Collection;
@@@ -46,16 -45,15 +48,14 @@@ import org.apache.accumulo.core.data.im
  import org.apache.accumulo.core.util.OpTimer;
  import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.core.util.TextUtil;
 -import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.WritableComparator;
 -import org.apache.log4j.Level;
 -import org.apache.log4j.Logger;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
  public class TabletLocatorImpl extends TabletLocator {
  
 -  private static final Logger log = Logger.getLogger(TabletLocatorImpl.class);
 +  private static final Logger log = LoggerFactory.getLogger(TabletLocatorImpl.class);
  
    // there seems to be a bug in TreeMap.tailMap related to
    // putting null in the treemap.. therefore instead of

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
index 6e92b68,c0cb219..5d52741
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
@@@ -16,8 -16,13 +16,11 @@@
   */
  package org.apache.accumulo.core.client.impl;
  
 -import java.util.Collection;
+ import java.util.List;
+ import java.util.Map;
+ 
  import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.AccumuloSecurityException;
 -import org.apache.accumulo.core.client.Instance;
  import org.apache.accumulo.core.client.TableNotFoundException;
  import org.apache.accumulo.core.client.TimedOutException;
  import org.apache.accumulo.core.data.Mutation;
@@@ -25,16 -30,12 +28,13 @@@ import org.apache.accumulo.core.data.Ra
  import org.apache.accumulo.core.data.impl.KeyExtent;
  import org.apache.hadoop.io.Text;
  
- import java.util.List;
- import java.util.Map;
- 
  /**
 - *
 + * Throws a {@link TimedOutException} if the specified timeout duration elapses between two failed TabletLocator calls.
 + * <p>
 + * This class is safe to cache locally.
   */
 -public class TimeoutTabletLocator extends TabletLocator {
 +public class TimeoutTabletLocator extends SyncingTabletLocator {
  
 -  private TabletLocator locator;
    private long timeout;
    private Long firstFailTime = null;
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index 6165346,967daf6..7d75f64
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@@ -16,6 -16,6 +16,8 @@@
   */
  package org.apache.accumulo.core.client.mapred;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
  import java.io.IOException;
  import java.net.InetAddress;
  import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 9ccf78a,83aa269..1925d0e
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@@ -16,6 -16,6 +16,8 @@@
   */
  package org.apache.accumulo.core.client.mapreduce;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
  import java.io.IOException;
  import java.net.InetAddress;
  import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index 1e89500,2f2dc5b..840af2f
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@@ -34,8 -34,8 +34,8 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.ZooKeeperInstance;
  import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils;
  import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource;
- import org.apache.accumulo.core.client.sample.SamplerConfiguration;
  import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
 -import org.apache.accumulo.core.client.mock.MockInstance;
++import org.apache.accumulo.core.client.sample.SamplerConfiguration;
  import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
  import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
  import org.apache.accumulo.core.data.ByteSequence;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index de89137,ffb9bc5..6ce9d28
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@@ -65,12 -65,7 +67,11 @@@ import org.apache.hadoop.fs.Path
  import org.apache.hadoop.io.Text;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
- import static com.google.common.base.Preconditions.checkArgument;
  
 +/**
 + * @deprecated since 1.8.0; use MiniAccumuloCluster or a standard mock framework instead.
 + */
 +@Deprecated
  class MockTableOperations extends TableOperationsHelper {
    private static final Logger log = LoggerFactory.getLogger(MockTableOperations.class);
    private static final byte[] ZERO = {0};

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/data/Key.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index bba8aeb,d8232b0..5cfe824
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@@ -41,14 -41,10 +41,14 @@@ import org.apache.accumulo.core.file.bl
  import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.Scalar;
  import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
  import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
- import org.apache.accumulo.core.security.crypto.CryptoModule;
- import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
- import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
- import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
 +import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
 +import org.apache.accumulo.core.file.streams.PositionedDataOutputStream;
 +import org.apache.accumulo.core.file.streams.PositionedOutput;
 +import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
+ import org.apache.accumulo.core.security.crypto.CryptoModule;
+ import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
+ import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
+ import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
index 4769818,0000000..a5ca790
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
@@@ -1,68 -1,0 +1,69 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.file.streams;
 +
 +import java.io.FilterOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.util.Objects;
++
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +
 +/**
 + * Utility functions for {@link PositionedOutput}.
 + */
 +public class PositionedOutputs {
 +  private PositionedOutputs() {}
 +
 +  /** Convert an {@code OutputStream} into an {@code OutputStream} implementing {@link PositionedOutput}. */
 +  public static PositionedOutputStream wrap(final OutputStream fout) {
 +    Objects.requireNonNull(fout);
 +    if (fout instanceof FSDataOutputStream) {
 +      return new PositionedOutputStream(fout) {
 +        @Override
 +        public long position() throws IOException {
 +          return ((FSDataOutputStream) fout).getPos();
 +        }
 +      };
 +    } else if (fout instanceof PositionedOutput) {
 +      return new PositionedOutputStream(fout) {
 +        @Override
 +        public long position() throws IOException {
 +          return ((PositionedOutput) fout).position();
 +        }
 +      };
 +    } else {
 +      return new PositionedOutputStream(fout) {
 +        @Override
 +        public long position() throws IOException {
 +          throw new UnsupportedOperationException("Underlying stream does not support position()");
 +        }
 +      };
 +    }
 +  }
 +
 +  private static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
 +    public PositionedOutputStream(OutputStream stream) {
 +      super(stream);
 +    }
 +
 +    @Override
 +    public void write(byte[] data, int off, int len) throws IOException {
 +      out.write(data, off, len);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
index 5254086,0000000..33c16cd
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
@@@ -1,69 -1,0 +1,70 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.file.streams;
 +
 +import java.io.FilterInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
++
 +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
 +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 +import org.apache.hadoop.fs.Seekable;
 +
 +/**
 + * A decorator for an {@code InputStream} which limits the rate at which reads are performed.
 + */
 +public class RateLimitedInputStream extends FilterInputStream implements Seekable {
 +  private final RateLimiter rateLimiter;
 +
 +  public <StreamType extends InputStream & Seekable> RateLimitedInputStream(StreamType stream, RateLimiter rateLimiter) {
 +    super(stream);
 +    this.rateLimiter = rateLimiter == null ? NullRateLimiter.INSTANCE : rateLimiter;
 +  }
 +
 +  @Override
 +  public int read() throws IOException {
 +    int val = in.read();
 +    if (val >= 0) {
 +      rateLimiter.acquire(1);
 +    }
 +    return val;
 +  }
 +
 +  @Override
 +  public int read(byte[] buffer, int offset, int length) throws IOException {
 +    int count = in.read(buffer, offset, length);
 +    if (count > 0) {
 +      rateLimiter.acquire(count);
 +    }
 +    return count;
 +  }
 +
 +  @Override
 +  public void seek(long pos) throws IOException {
 +    ((Seekable) in).seek(pos);
 +  }
 +
 +  @Override
 +  public long getPos() throws IOException {
 +    return ((Seekable) in).getPos();
 +  }
 +
 +  @Override
 +  public boolean seekToNewSource(long targetPos) throws IOException {
 +    return ((Seekable) in).seekToNewSource(targetPos);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
index b426a6b,0000000..417b89c
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
@@@ -1,57 -1,0 +1,58 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.accumulo.core.file.streams;
 +
 +import java.io.FilterOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
++
 +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
 +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 +
 +/**
 + * A decorator for {@code OutputStream} which limits the rate at which data may be written.
 + */
 +public class RateLimitedOutputStream extends FilterOutputStream implements PositionedOutput {
 +  private final RateLimiter writeLimiter;
 +
 +  public RateLimitedOutputStream(OutputStream wrappedStream, RateLimiter writeLimiter) {
 +    super(PositionedOutputs.wrap(wrappedStream));
 +    this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter;
 +  }
 +
 +  @Override
 +  public void write(int i) throws IOException {
 +    writeLimiter.acquire(1);
 +    out.write(i);
 +  }
 +
 +  @Override
 +  public void write(byte[] buffer, int offset, int length) throws IOException {
 +    writeLimiter.acquire(length);
 +    out.write(buffer, offset, length);
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
 +    out.close();
 +  }
 +
 +  @Override
 +  public long position() throws IOException {
 +    return ((PositionedOutput) out).position();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
index 09060f5,0000000..0a53b74
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
@@@ -1,46 -1,0 +1,47 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.file.streams;
 +
 +import java.io.DataInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
++
 +import org.apache.hadoop.fs.Seekable;
 +
 +/**
 + * A wrapper converting a {@link Seekable} {@code InputStream} into a {@code Seekable} {@link DataInputStream}
 + */
 +public class SeekableDataInputStream extends DataInputStream implements Seekable {
 +  public <StreamType extends InputStream & Seekable> SeekableDataInputStream(StreamType stream) {
 +    super(stream);
 +  }
 +
 +  @Override
 +  public void seek(long pos) throws IOException {
 +    ((Seekable) in).seek(pos);
 +  }
 +
 +  @Override
 +  public long getPos() throws IOException {
 +    return ((Seekable) in).getPos();
 +  }
 +
 +  @Override
 +  public boolean seekToNewSource(long targetPos) throws IOException {
 +    return ((Seekable) in).seekToNewSource(targetPos);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
index a1b4969,ac8355b..dba3a32
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
@@@ -37,6 -33,6 +36,8 @@@ import org.apache.accumulo.core.iterato
  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
  import org.apache.commons.lang.mutable.MutableLong;
  
++import com.google.common.collect.ImmutableSet;
++
  /**
   *
   */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index 9cb3bd7,1f06a71..f1732f7
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@@ -31,9 -30,7 +30,11 @@@ import org.apache.accumulo.core.data.Ra
  import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.iterators.IteratorEnvironment;
  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.YieldCallback;
 +import org.apache.accumulo.core.iterators.YieldingKeyValueIterator;
 +
++import com.google.common.base.Optional;
+ 
  /**
   * A SortedKeyValueIterator which presents a view over some section of data, regardless of whether or not it is backed by memory (InMemoryMap) or an RFile
   * (InMemoryMap that was minor compacted to a file). Clients reading from a table that has data in memory should not see interruption in their scan when that

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceOpts.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceOpts.java
index 53cbfb0,0000000..d1a0911
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceOpts.java
@@@ -1,125 -1,0 +1,125 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators.user;
 +
- import org.apache.accumulo.core.iterators.OptionDescriber;
- import org.apache.hadoop.io.Text;
++import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Map;
 +
- import static java.nio.charset.StandardCharsets.UTF_8;
++import org.apache.accumulo.core.iterators.OptionDescriber;
++import org.apache.hadoop.io.Text;
 +
 +public class CfCqSliceOpts {
 +  public static final String OPT_MIN_CF = "minCf";
 +  public static final String OPT_MIN_CF_DESC = "UTF-8 encoded string representing minimum column family. "
 +      + "Optional parameter. If minCf and minCq are undefined, the column slice will start at the first column "
 +      + "of each row. If you want to do an exact match on column families, it's more efficient to leave minCf "
 +      + "and maxCf undefined and use the scanner's fetchColumnFamily method.";
 +
 +  public static final String OPT_MIN_CQ = "minCq";
 +  public static final String OPT_MIN_CQ_DESC = "UTF-8 encoded string representing minimum column qualifier. "
 +      + "Optional parameter. If minCf and minCq are undefined, the column slice will start at the first column " + "of each row.";
 +
 +  public static final String OPT_MAX_CF = "maxCf";
 +  public static final String OPT_MAX_CF_DESC = "UTF-8 encoded string representing maximum column family. "
 +      + "Optional parameter. If minCf and minCq are undefined, the column slice will start at the first column "
 +      + "of each row. If you want to do an exact match on column families, it's more efficient to leave minCf "
 +      + "and maxCf undefined and use the scanner's fetchColumnFamily method.";
 +
 +  public static final String OPT_MAX_CQ = "maxCq";
 +  public static final String OPT_MAX_CQ_DESC = "UTF-8 encoded string representing maximum column qualifier. "
 +      + "Optional parameter. If maxCf and MaxCq are undefined, the column slice will end at the last column of " + "each row.";
 +
 +  public static final String OPT_MIN_INCLUSIVE = "minInclusive";
 +  public static final String OPT_MIN_INCLUSIVE_DESC = "UTF-8 encoded string indicating whether to include the "
 +      + "minimum column in the slice range. Optional parameter, default is true.";
 +
 +  public static final String OPT_MAX_INCLUSIVE = "maxInclusive";
 +  public static final String OPT_MAX_INCLUSIVE_DESC = "UTF-8 encoded string indicating whether to include the "
 +      + "maximum column in the slice range. Optional parameter, default is true.";
 +
 +  Text minCf;
 +  Text minCq;
 +
 +  Text maxCf;
 +  Text maxCq;
 +
 +  boolean minInclusive;
 +  boolean maxInclusive;
 +
 +  public CfCqSliceOpts(CfCqSliceOpts o) {
 +    minCf = new Text(o.minCf);
 +    minCq = new Text(o.minCq);
 +    maxCf = new Text(o.maxCf);
 +    maxCq = new Text(o.maxCq);
 +    minInclusive = o.minInclusive;
 +    maxInclusive = o.maxInclusive;
 +  }
 +
 +  public CfCqSliceOpts(Map<String,String> options) {
 +    String optStr = options.get(OPT_MIN_CF);
 +    minCf = optStr == null ? new Text() : new Text(optStr.getBytes(UTF_8));
 +
 +    optStr = options.get(OPT_MIN_CQ);
 +    minCq = optStr == null ? new Text() : new Text(optStr.getBytes(UTF_8));
 +
 +    optStr = options.get(OPT_MAX_CF);
 +    maxCf = optStr == null ? new Text() : new Text(optStr.getBytes(UTF_8));
 +
 +    optStr = options.get(OPT_MAX_CQ);
 +    maxCq = optStr == null ? new Text() : new Text(optStr.getBytes(UTF_8));
 +
 +    optStr = options.get(OPT_MIN_INCLUSIVE);
 +    minInclusive = optStr == null || optStr.isEmpty() ? true : Boolean.valueOf(options.get(OPT_MIN_INCLUSIVE));
 +
 +    optStr = options.get(OPT_MAX_INCLUSIVE);
 +    maxInclusive = optStr == null || optStr.isEmpty() ? true : Boolean.valueOf(options.get(OPT_MAX_INCLUSIVE));
 +  }
 +
 +  static class Describer implements OptionDescriber {
 +    @Override
 +    public OptionDescriber.IteratorOptions describeOptions() {
 +      Map<String,String> options = new HashMap<>();
 +      options.put(OPT_MIN_CF, OPT_MIN_CF_DESC);
 +      options.put(OPT_MIN_CQ, OPT_MIN_CQ_DESC);
 +      options.put(OPT_MAX_CF, OPT_MAX_CF_DESC);
 +      options.put(OPT_MAX_CQ, OPT_MAX_CQ_DESC);
 +      options.put(OPT_MIN_INCLUSIVE, OPT_MIN_INCLUSIVE_DESC);
 +      options.put(OPT_MAX_INCLUSIVE, OPT_MAX_INCLUSIVE_DESC);
 +      return new OptionDescriber.IteratorOptions("ColumnSliceFilter", "Returns all key/value pairs where the column is between the specified values", options,
 +          Collections.<String> emptyList());
 +    }
 +
 +    @Override
 +    public boolean validateOptions(Map<String,String> options) {
 +      // if you don't specify a max CF and a max CQ, that means there's no upper bounds to the slice. In that case
 +      // you must not set max inclusive to false.
 +      CfCqSliceOpts o = new CfCqSliceOpts(options);
 +      boolean boundsOk = true;
 +      boolean upperBoundsExist = o.maxCf.getLength() > 0 && o.maxCq.getLength() > 0;
 +      if (upperBoundsExist) {
 +        boundsOk = o.maxInclusive;
 +      }
 +      boolean cqRangeOk = o.maxCq.getLength() == 0 || (o.minCq.compareTo(o.maxCq) < 1);
 +      boolean cfRangeOk = o.maxCf.getLength() == 0 || (o.minCf.compareTo(o.maxCf) < 1);
 +      return boundsOk && cqRangeOk && cfRangeOk;
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceSeekingFilter.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceSeekingFilter.java
index e5c4969,0000000..ababdcb
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceSeekingFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceSeekingFilter.java
@@@ -1,134 -1,0 +1,134 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators.user;
 +
++import java.io.IOException;
++import java.util.Map;
++
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.OptionDescriber;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +
- import java.io.IOException;
- import java.util.Map;
- 
 +/**
 + * Filters key/value pairs for a range of column families and a range of column qualifiers. Only keys which fall in both ranges will be passed by the filter.
 + * Note that if you have a small, well-defined set of column families it will be much more efficient to configure locality groups to isolate that data instead
 + * of configuring this iterator to seek over it.
 + *
 + * This filter may be more efficient than the CfCqSliceFilter or the ColumnSlice filter for small slices of large rows as it will seek to the next potential
 + * match once it determines that it has iterated past the end of a slice.
 + *
 + * @see org.apache.accumulo.core.iterators.user.CfCqSliceOpts for a description of this iterator's options.
 + */
 +public class CfCqSliceSeekingFilter extends SeekingFilter implements OptionDescriber {
 +
 +  private static final FilterResult SKIP_TO_HINT = FilterResult.of(false, AdvanceResult.USE_HINT);
 +  private static final FilterResult SKIP_TO_NEXT = FilterResult.of(false, AdvanceResult.NEXT);
 +  private static final FilterResult SKIP_TO_NEXT_ROW = FilterResult.of(false, AdvanceResult.NEXT_ROW);
 +  private static final FilterResult SKIP_TO_NEXT_CF = FilterResult.of(false, AdvanceResult.NEXT_CF);
 +  private static final FilterResult INCLUDE_AND_NEXT = FilterResult.of(true, AdvanceResult.NEXT);
 +  private static final FilterResult INCLUDE_AND_NEXT_CF = FilterResult.of(true, AdvanceResult.NEXT_CF);
 +
 +  private CfCqSliceOpts cso;
 +
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    super.init(source, options, env);
 +    cso = new CfCqSliceOpts(options);
 +  }
 +
 +  @Override
 +  public FilterResult filter(Key k, Value v) {
 +    if (cso.minCf.getLength() > 0) {
 +      int minCfCmp = k.compareColumnFamily(cso.minCf);
 +      if (minCfCmp < 0) {
 +        return SKIP_TO_HINT; // hint will be the min CF in this row.
 +      }
 +      if (minCfCmp == 0 && !cso.minInclusive) {
 +        return SKIP_TO_NEXT;
 +      }
 +    }
 +    if (cso.maxCf.getLength() > 0) {
 +      int maxCfCmp = k.compareColumnFamily(cso.maxCf);
 +      if (maxCfCmp > 0 || (maxCfCmp == 0 && !cso.maxInclusive)) {
 +        return SKIP_TO_NEXT_ROW;
 +      }
 +    }
 +    // at this point we're in the correct CF range, now check the CQ.
 +    if (cso.minCq.getLength() > 0) {
 +      int minCqCmp = k.compareColumnQualifier(cso.minCq);
 +      if (minCqCmp < 0) {
 +        return SKIP_TO_HINT; // hint will be the min CQ in this CF in this row.
 +      }
 +      if (minCqCmp == 0 && !cso.minInclusive) {
 +        return SKIP_TO_NEXT;
 +      }
 +    }
 +    if (cso.maxCq.getLength() > 0) {
 +      int maxCqCmp = k.compareColumnQualifier(cso.maxCq);
 +      if (maxCqCmp > 0 || (maxCqCmp == 0 && !cso.maxInclusive)) {
 +        return SKIP_TO_NEXT_CF;
 +      }
 +      if (maxCqCmp == 0) {
 +        // special-case here: we know we're at the last CQ in the slice, so skip to the next CF in the row.
 +        return INCLUDE_AND_NEXT_CF;
 +      }
 +    }
 +    // at this point we're in the CQ slice.
 +    return INCLUDE_AND_NEXT;
 +  }
 +
 +  @Override
 +  public Key getNextKeyHint(Key k, Value v) throws IllegalArgumentException {
 +    if (cso.minCf.getLength() > 0) {
 +      int minCfCmp = k.compareColumnFamily(cso.minCf);
 +      if (minCfCmp < 0) {
 +        Key hint = new Key(k.getRow(), cso.minCf);
 +        return cso.minInclusive ? hint : hint.followingKey(PartialKey.ROW_COLFAM);
 +      }
 +    }
 +    if (cso.minCq.getLength() > 0) {
 +      int minCqCmp = k.compareColumnQualifier(cso.minCq);
 +      if (minCqCmp < 0) {
 +        Key hint = new Key(k.getRow(), k.getColumnFamily(), cso.minCq);
 +        return cso.minInclusive ? hint : hint.followingKey(PartialKey.ROW_COLFAM_COLQUAL);
 +      }
 +    }
 +    // If we get here it means that we were asked to provide a hint for a key that we
 +    // didn't return USE_HINT for.
 +    throw new IllegalArgumentException("Don't know how to provide hint for key " + k);
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    CfCqSliceSeekingFilter o = (CfCqSliceSeekingFilter) super.deepCopy(env);
 +    o.cso = new CfCqSliceOpts(cso);
 +    return o;
 +  }
 +
 +  @Override
 +  public IteratorOptions describeOptions() {
 +    return new CfCqSliceOpts.Describer().describeOptions();
 +  }
 +
 +  @Override
 +  public boolean validateOptions(Map<String,String> options) {
 +    return new CfCqSliceOpts.Describer().validateOptions(options);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/iterators/user/SeekingFilter.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/SeekingFilter.java
index bdc9b14,0000000..56e0e88
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/SeekingFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/SeekingFilter.java
@@@ -1,220 -1,0 +1,220 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators.user;
 +
++import java.io.IOException;
++import java.util.Collection;
++import java.util.EnumMap;
++import java.util.Map;
++
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.WrappingIterator;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import java.io.IOException;
- import java.util.Collection;
- import java.util.EnumMap;
- import java.util.Map;
- 
 +/**
 + * Base class for filters that can skip over key-value pairs which do not match their filter predicate. In addition to returning true/false to accept or reject
 + * a kv pair, subclasses can return an extra field which indicates how far the source iterator should be advanced.
 + *
 + * Note that the behaviour of the negate option is different from the Filter class. If a KV pair fails the subclass' filter predicate and negate is true, then
 + * the KV pair will pass the filter. However if the subclass advances the source past a bunch of KV pairs, all those pairs will be implicitly rejected and
 + * negate will have no effect.
 + *
 + * @see org.apache.accumulo.core.iterators.Filter
 + */
 +public abstract class SeekingFilter extends WrappingIterator {
 +  private static final Logger log = LoggerFactory.getLogger(SeekingFilter.class);
 +
 +  protected static final String NEGATE = "negate";
 +
 +  public enum AdvanceResult {
 +    NEXT, NEXT_CQ, NEXT_CF, NEXT_ROW, USE_HINT
 +  }
 +
 +  public static class FilterResult {
 +    private static final EnumMap<AdvanceResult,FilterResult> PASSES = new EnumMap<>(AdvanceResult.class);
 +    private static final EnumMap<AdvanceResult,FilterResult> FAILS = new EnumMap<>(AdvanceResult.class);
 +    static {
 +      for (AdvanceResult ar : AdvanceResult.values()) {
 +        PASSES.put(ar, new FilterResult(true, ar));
 +        FAILS.put(ar, new FilterResult(false, ar));
 +      }
 +    }
 +
 +    final boolean accept;
 +    final AdvanceResult advance;
 +
 +    public FilterResult(boolean accept, AdvanceResult advance) {
 +      this.accept = accept;
 +      this.advance = advance;
 +    }
 +
 +    public static FilterResult of(boolean accept, AdvanceResult advance) {
 +      return accept ? PASSES.get(advance) : FAILS.get(advance);
 +    }
 +
 +    public String toString() {
 +      return "Acc: " + accept + " Adv: " + advance;
 +    }
 +  }
 +
 +  /**
 +   * Subclasses must provide an implementation which examines the given key and value and determines (1) whether to accept the KV pair and (2) how far to
 +   * advance the source iterator past the key.
 +   *
 +   * @param k
 +   *          a key
 +   * @param v
 +   *          a value
 +   * @return indicating whether to pass or block the key, and how far the source iterator should be advanced.
 +   */
 +  public abstract FilterResult filter(Key k, Value v);
 +
 +  /**
 +   * Whenever the subclass returns AdvanceResult.USE_HINT from its filter predicate, this method will be called to see how far to advance the source iterator.
 +   * The return value must be a key which is greater than (sorts after) the input key. If the subclass never returns USE_HINT, this method will never be called
 +   * and may safely return null.
 +   *
 +   * @param k
 +   *          a key
 +   * @param v
 +   *          a value
 +   * @return as above
 +   */
 +  public abstract Key getNextKeyHint(Key k, Value v);
 +
 +  private Collection<ByteSequence> columnFamilies;
 +  private boolean inclusive;
 +  private Range seekRange;
 +  private boolean negate;
 +
 +  private AdvanceResult advance;
 +
 +  private boolean advancedPastSeek = false;
 +
 +  @Override
 +  public void next() throws IOException {
 +    advanceSource(getSource(), advance);
 +    findTop();
 +  }
 +
 +  @Override
 +  public boolean hasTop() {
 +    return !advancedPastSeek && super.hasTop();
 +  }
 +
 +  @Override
 +  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
 +    super.seek(range, columnFamilies, inclusive);
 +    advance = null;
 +    this.columnFamilies = columnFamilies;
 +    this.inclusive = inclusive;
 +    seekRange = range;
 +    advancedPastSeek = false;
 +    findTop();
 +  }
 +
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    super.init(source, options, env);
 +    negate = Boolean.parseBoolean(options.get(NEGATE));
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    SeekingFilter newInstance;
 +    try {
 +      newInstance = this.getClass().newInstance();
 +    } catch (Exception e) {
 +      throw new RuntimeException(e);
 +    }
 +    newInstance.setSource(getSource().deepCopy(env));
 +    newInstance.negate = negate;
 +    return newInstance;
 +  }
 +
 +  protected void findTop() throws IOException {
 +    SortedKeyValueIterator<Key,Value> src = getSource();
 +    // advance could be null if we've just been seeked
 +    advance = null;
 +    while (src.hasTop() && !advancedPastSeek) {
 +      if (src.getTopKey().isDeleted()) {
 +        // as per. o.a.a.core.iterators.Filter, deleted keys always pass through the filter.
 +        advance = AdvanceResult.NEXT;
 +        return;
 +      }
 +      FilterResult f = filter(src.getTopKey(), src.getTopValue());
 +      if (log.isTraceEnabled()) {
 +        log.trace("Filtered: {} result == {} hint == {}", src.getTopKey(), f,
 +            f.advance == AdvanceResult.USE_HINT ? getNextKeyHint(src.getTopKey(), src.getTopValue()) : " (none)");
 +      }
 +      if (f.accept != negate) {
 +        // advance will be processed when next is called
 +        advance = f.advance;
 +        break;
 +      } else {
 +        advanceSource(src, f.advance);
 +      }
 +    }
 +  }
 +
 +  private void advanceSource(SortedKeyValueIterator<Key,Value> src, AdvanceResult adv) throws IOException {
 +    Key topKey = src.getTopKey();
 +    Range advRange = null;
 +    switch (adv) {
 +      case NEXT:
 +        src.next();
 +        return;
 +      case NEXT_CQ:
 +        advRange = new Range(topKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), null);
 +        break;
 +      case NEXT_CF:
 +        advRange = new Range(topKey.followingKey(PartialKey.ROW_COLFAM), null);
 +        break;
 +      case NEXT_ROW:
 +        advRange = new Range(topKey.followingKey(PartialKey.ROW), null);
 +        break;
 +      case USE_HINT:
 +        Value topVal = src.getTopValue();
 +        Key hintKey = getNextKeyHint(topKey, topVal);
 +        if (hintKey != null && hintKey.compareTo(topKey) > 0) {
 +          advRange = new Range(hintKey, null);
 +        } else {
 +          String msg = "Filter returned USE_HINT for " + topKey + " but invalid hint: " + hintKey;
 +          throw new IOException(msg);
 +        }
 +        break;
 +    }
 +    if (advRange == null) {
 +      // Should never get here. Just a safeguard in case somebody adds a new type of AdvanceRange and forgets to handle it here.
 +      throw new IOException("Unable to determine range to advance to for AdvanceResult " + adv);
 +    }
 +    advRange = advRange.clip(seekRange, true);
 +    if (advRange == null) {
 +      // the advanced range is outside the seek range. the source is exhausted.
 +      advancedPastSeek = true;
 +    } else {
 +      src.seek(advRange, columnFamilies, inclusive);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index 4fa47c3,a99fe01..7e9c82b
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@@ -47,6 -44,6 +45,8 @@@ import org.apache.commons.lang.mutable.
  import org.apache.hadoop.io.Text;
  
  import com.google.common.base.Joiner;
++import com.google.common.collect.ImmutableSet;
++import com.google.common.collect.ImmutableSet.Builder;
  
  public class LocalityGroupUtil {
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/util/format/BinaryFormatter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/util/format/DateStringFormatter.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/format/DateStringFormatter.java
index 63bd536,5bcd4a3..d651a22
--- a/core/src/main/java/org/apache/accumulo/core/util/format/DateStringFormatter.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/format/DateStringFormatter.java
@@@ -16,8 -16,11 +16,9 @@@
   */
  package org.apache.accumulo.core.util.format;
  
 -import java.text.DateFormat;
 -import java.text.SimpleDateFormat;
  import java.util.Map.Entry;
  import java.util.TimeZone;
+ 
  import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.Value;
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/util/format/DefaultFormatter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index ac1eec9,0000000..83be6e4
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@@ -1,180 -1,0 +1,182 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.util.ratelimit;
 +
- import com.google.common.collect.ImmutableMap;
 +import java.util.Map;
 +import java.util.Timer;
 +import java.util.TimerTask;
 +import java.util.WeakHashMap;
++
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import com.google.common.collect.ImmutableMap;
++
 +/**
 + * Provides the ability to retrieve a {@link RateLimiter} keyed to a specific string, which will dynamically update its rate according to a specified callback
 + * function.
 + */
 +public class SharedRateLimiterFactory {
 +  private static final long REPORT_RATE = 60000;
 +  private static final long UPDATE_RATE = 1000;
 +  private static SharedRateLimiterFactory instance = null;
 +  private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
 +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
 +
 +  private SharedRateLimiterFactory() {}
 +
 +  /** Get the singleton instance of the SharedRateLimiterFactory. */
 +  public static synchronized SharedRateLimiterFactory getInstance() {
 +    if (instance == null) {
 +      instance = new SharedRateLimiterFactory();
 +
 +      Timer timer = new Timer("SharedRateLimiterFactory update/report polling");
 +
 +      // Update periodically
 +      timer.schedule(new TimerTask() {
 +        @Override
 +        public void run() {
 +          instance.update();
 +        }
 +      }, UPDATE_RATE, UPDATE_RATE);
 +
 +      // Report periodically
 +      timer.schedule(new TimerTask() {
 +        @Override
 +        public void run() {
 +          instance.report();
 +        }
 +      }, REPORT_RATE, REPORT_RATE);
 +    }
 +    return instance;
 +  }
 +
 +  /**
 +   * A callback which provides the current rate for a {@link RateLimiter}.
 +   */
 +  public static interface RateProvider {
 +    /**
 +     * Calculate the current rate for the {@link RateLimiter}.
 +     *
 +     * @return Count of permits which should be provided per second. A nonpositive count is taken to indicate that no rate limiting should be performed.
 +     */
 +    public long getDesiredRate();
 +  }
 +
 +  /**
 +   * Lookup the RateLimiter associated with the specified name, or create a new one for that name.
 +   *
 +   * @param name
 +   *          key for the rate limiter
 +   * @param rateProvider
 +   *          a function which can be called to get what the current rate for the rate limiter should be.
 +   */
 +  public RateLimiter create(String name, RateProvider rateProvider) {
 +    synchronized (activeLimiters) {
 +      if (activeLimiters.containsKey(name)) {
 +        SharedRateLimiter limiter = activeLimiters.get(name);
 +        return limiter;
 +      } else {
 +        long initialRate;
 +        initialRate = rateProvider.getDesiredRate();
 +        SharedRateLimiter limiter = new SharedRateLimiter(name, rateProvider, initialRate);
 +        activeLimiters.put(name, limiter);
 +        return limiter;
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Walk through all of the currently active RateLimiters, having each update its current rate. This is called periodically so that we can dynamically update
 +   * as configuration changes.
 +   */
 +  protected void update() {
 +    Map<String,SharedRateLimiter> limitersCopy;
 +    synchronized (activeLimiters) {
 +      limitersCopy = ImmutableMap.copyOf(activeLimiters);
 +    }
 +    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
 +      try {
 +        entry.getValue().update();
 +      } catch (Exception ex) {
 +        log.error(String.format("Failed to update limiter %s", entry.getKey()), ex);
 +      }
 +    }
 +  }
 +
 +  /** Walk through all of the currently active RateLimiters, having each report its activity to the debug log. */
 +  protected void report() {
 +    Map<String,SharedRateLimiter> limitersCopy;
 +    synchronized (activeLimiters) {
 +      limitersCopy = ImmutableMap.copyOf(activeLimiters);
 +    }
 +    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
 +      try {
 +        entry.getValue().report();
 +      } catch (Exception ex) {
 +        log.error(String.format("Failed to report limiter %s", entry.getKey()), ex);
 +      }
 +    }
 +  }
 +
 +  protected class SharedRateLimiter extends GuavaRateLimiter {
 +    private volatile long permitsAcquired = 0;
 +    private volatile long lastUpdate;
 +
 +    private final RateProvider rateProvider;
 +    private final String name;
 +
 +    SharedRateLimiter(String name, RateProvider rateProvider, long initialRate) {
 +      super(initialRate);
 +      this.name = name;
 +      this.rateProvider = rateProvider;
 +      this.lastUpdate = System.currentTimeMillis();
 +    }
 +
 +    @Override
 +    public void acquire(long permits) {
 +      super.acquire(permits);
 +      permitsAcquired += permits;
 +    }
 +
 +    /** Poll the callback, updating the current rate if necessary. */
 +    public void update() {
 +      // Reset rate if needed
 +      long rate = rateProvider.getDesiredRate();
 +      if (rate != getRate()) {
 +        setRate(rate);
 +      }
 +    }
 +
 +    /** Report the current throughput and usage of this rate limiter to the debug log. */
 +    public void report() {
 +      if (log.isDebugEnabled()) {
 +        long duration = System.currentTimeMillis() - lastUpdate;
 +        if (duration == 0)
 +          return;
 +        lastUpdate = System.currentTimeMillis();
 +
 +        long sum = permitsAcquired;
 +        permitsAcquired = 0;
 +
 +        if (sum > 0) {
 +          log.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / duration, getRate()));
 +        }
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/test/java/org/apache/accumulo/core/client/TestThrift1474.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/client/TestThrift1474.java
index 845439e,a99f415..a0c94ef
--- a/core/src/test/java/org/apache/accumulo/core/client/TestThrift1474.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/TestThrift1474.java
@@@ -16,6 -16,6 +16,7 @@@
   */
  package org.apache.accumulo.core.client;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
  import static org.junit.Assert.fail;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/test/java/org/apache/accumulo/core/client/lexicoder/FloatLexicoderTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/client/lexicoder/FloatLexicoderTest.java
index 7ac683a,0000000..2f9e576
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/client/lexicoder/FloatLexicoderTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/lexicoder/FloatLexicoderTest.java
@@@ -1,45 -1,0 +1,45 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.lexicoder;
 +
- import org.apache.accumulo.core.client.lexicoder.impl.AbstractLexicoderTest;
- 
 +import java.util.Arrays;
 +
++import org.apache.accumulo.core.client.lexicoder.impl.AbstractLexicoderTest;
++
 +/**
 + *
 + */
 +public class FloatLexicoderTest extends AbstractLexicoderTest {
 +
 +  public void testSortOrder() {
 +    assertSortOrder(
 +        new FloatLexicoder(),
 +        Arrays.asList(Float.MIN_VALUE, Float.MAX_VALUE, Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY, 0.0F, 0.01F, 0.001F, 1.0F, -1.0F, -1.1F, -1.01F,
 +            Math.nextUp(Float.NEGATIVE_INFINITY), Math.nextAfter(0.0F, Float.NEGATIVE_INFINITY), Math.nextAfter(Float.MAX_VALUE, Float.NEGATIVE_INFINITY)));
 +
 +  }
 +
 +  public void testDecode() {
 +    assertDecodes(new FloatLexicoder(), Float.MIN_VALUE);
 +    assertDecodes(new FloatLexicoder(), Math.nextUp(Float.NEGATIVE_INFINITY));
 +    assertDecodes(new FloatLexicoder(), -1.0F);
 +    assertDecodes(new FloatLexicoder(), 0.0F);
 +    assertDecodes(new FloatLexicoder(), 1.0F);
 +    assertDecodes(new FloatLexicoder(), Math.nextAfter(Float.POSITIVE_INFINITY, 0.0F));
 +    assertDecodes(new FloatLexicoder(), Float.MAX_VALUE);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/test/java/org/apache/accumulo/core/data/RangeTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
index 9574d36,0000000..183f045
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
@@@ -1,38 -1,0 +1,39 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.file.streams;
 +
 +import java.util.concurrent.atomic.AtomicLong;
++
 +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 +
 +public class MockRateLimiter implements RateLimiter {
 +  private final AtomicLong permitsAcquired = new AtomicLong();
 +
 +  @Override
 +  public long getRate() {
 +    return 0;
 +  }
 +
 +  @Override
 +  public void acquire(long permits) {
 +    permitsAcquired.addAndGet(permits);
 +  }
 +
 +  public long getPermitsAcquired() {
 +    return permitsAcquired.get();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
index 6baff87,0000000..b480fa9
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
@@@ -1,69 -1,0 +1,70 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.file.streams;
 +
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.util.Random;
++
 +import org.apache.hadoop.fs.Seekable;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class RateLimitedInputStreamTest {
 +
 +  @Test
 +  public void permitsAreProperlyAcquired() throws Exception {
 +    Random randGen = new Random();
 +    MockRateLimiter rateLimiter = new MockRateLimiter();
 +    long bytesRetrieved = 0;
 +    try (InputStream is = new RateLimitedInputStream(new RandomInputStream(), rateLimiter)) {
 +      for (int i = 0; i < 100; ++i) {
 +        int count = Math.abs(randGen.nextInt()) % 65536;
 +        int countRead = is.read(new byte[count]);
 +        Assert.assertEquals(count, countRead);
 +        bytesRetrieved += count;
 +      }
 +    }
 +    Assert.assertEquals(bytesRetrieved, rateLimiter.getPermitsAcquired());
 +  }
 +
 +  private static class RandomInputStream extends InputStream implements Seekable {
 +    private final Random r = new Random();
 +
 +    @Override
 +    public int read() throws IOException {
 +      return r.nextInt() & 0xff;
 +    }
 +
 +    @Override
 +    public void seek(long pos) throws IOException {
 +      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
 +    }
 +
 +    @Override
 +    public long getPos() throws IOException {
 +      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
 +    }
 +
 +    @Override
 +    public boolean seekToNewSource(long targetPos) throws IOException {
 +      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
 +    }
 +
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
index 9e12354,0000000..f4144dc
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
@@@ -1,56 -1,0 +1,58 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.file.streams;
 +
- import com.google.common.io.ByteStreams;
- import com.google.common.io.CountingOutputStream;
 +import java.io.FilterOutputStream;
 +import java.io.IOException;
 +import java.util.Random;
++
 +import org.junit.Assert;
 +import org.junit.Test;
 +
++import com.google.common.io.ByteStreams;
++import com.google.common.io.CountingOutputStream;
++
 +public class RateLimitedOutputStreamTest {
 +
 +  @Test
 +  public void permitsAreProperlyAcquired() throws Exception {
 +    Random randGen = new Random();
 +    MockRateLimiter rateLimiter = new MockRateLimiter();
 +    long bytesWritten = 0;
 +    try (RateLimitedOutputStream os = new RateLimitedOutputStream(new NullOutputStream(), rateLimiter)) {
 +      for (int i = 0; i < 100; ++i) {
 +        byte[] bytes = new byte[Math.abs(randGen.nextInt() % 65536)];
 +        os.write(bytes);
 +        bytesWritten += bytes.length;
 +      }
 +      Assert.assertEquals(bytesWritten, os.position());
 +    }
 +    Assert.assertEquals(bytesWritten, rateLimiter.getPermitsAcquired());
 +  }
 +
 +  public static class NullOutputStream extends FilterOutputStream implements PositionedOutput {
 +    public NullOutputStream() {
 +      super(new CountingOutputStream(ByteStreams.nullOutputStream()));
 +    }
 +
 +    @Override
 +    public long position() throws IOException {
 +      return ((CountingOutputStream) out).getCount();
 +    }
 +  }
 +
 +}


Mime
View raw message