hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [2/2] hbase git commit: HBASE-17508 Unify the implementation of small scan and regular scan for sync client
Date Sun, 05 Feb 2017 00:59:28 GMT
HBASE-17508 Unify the implementation of small scan and regular scan for sync client


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ffa0cea2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ffa0cea2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ffa0cea2

Branch: refs/heads/master
Commit: ffa0cea2a3b1b2e6bb1a57e21f50dbf19159ab04
Parents: 4e77b18
Author: zhangduo <zhangduo@apache.org>
Authored: Sat Feb 4 22:00:48 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Sun Feb 5 08:49:39 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/MetaTableAccessor.java  |  14 +-
 .../hbase/client/AsyncNonMetaRegionLocator.java |   4 +-
 .../hadoop/hbase/client/ClientScanner.java      | 245 +++++++------
 .../client/ClientSmallReversedScanner.java      | 336 ------------------
 .../hadoop/hbase/client/ClientSmallScanner.java | 306 ----------------
 .../hbase/client/ConnectionImplementation.java  |  17 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |  18 +-
 .../hbase/client/ReversedClientScanner.java     |  21 +-
 .../hbase/client/ReversedScannerCallable.java   |   2 +-
 .../org/apache/hadoop/hbase/client/Scan.java    |   9 +
 .../hadoop/hbase/client/ScannerCallable.java    | 265 +++++++-------
 .../client/ScannerCallableWithReplicas.java     |  17 +-
 .../hadoop/hbase/client/TestClientScanner.java  | 112 +++---
 .../client/TestClientSmallReversedScanner.java  | 345 -------------------
 .../hbase/client/TestClientSmallScanner.java    | 335 ------------------
 .../hadoop/hbase/mapreduce/SyncTable.java       | 165 +++++----
 .../hbase/regionserver/RSRpcServices.java       |  34 +-
 .../org/apache/hadoop/hbase/tool/Canary.java    |   5 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |  12 +-
 .../hbase/TestMetaTableAccessorNoCluster.java   |   2 +-
 .../hbase/TestPartialResultsFromClientSide.java |  12 +-
 .../client/TestClientScannerRPCTimeout.java     |   5 +
 .../hbase/client/TestFromClientSide3.java       |   2 +-
 .../hadoop/hbase/client/TestLeaseRenewal.java   |   4 +-
 .../hbase/regionserver/TestMobStoreScanner.java |   4 +-
 .../regionserver/TestScannerWithBulkload.java   |   6 +-
 .../security/access/TestAccessController.java   |   1 -
 .../security/access/TestAccessController2.java  |   3 +
 .../hbase/snapshot/MobSnapshotTestingUtils.java |  36 +-
 29 files changed, 520 insertions(+), 1817 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 1cc7963..5a37afc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -560,16 +560,12 @@ public class MetaTableAccessor {
     // Stop key appends the smallest possible char to the table name
     byte[] stopKey = getTableStopRowForMeta(tableName, QueryType.REGION);
 
-    Scan scan = getMetaScan(connection);
+    Scan scan = getMetaScan(connection, -1);
     scan.setStartRow(startKey);
     scan.setStopRow(stopKey);
     return scan;
   }
 
-  private static Scan getMetaScan(Connection connection) {
-    return getMetaScan(connection, Integer.MAX_VALUE);
-  }
-
   private static Scan getMetaScan(Connection connection, int rowUpperLimit) {
     Scan scan = new Scan();
     int scannerCaching = connection.getConfiguration()
@@ -579,11 +575,11 @@ public class MetaTableAccessor {
         HConstants.DEFAULT_USE_META_REPLICAS)) {
       scan.setConsistency(Consistency.TIMELINE);
     }
-    if (rowUpperLimit <= scannerCaching) {
-      scan.setSmall(true);
+    if (rowUpperLimit > 0) {
+      scan.setLimit(rowUpperLimit);
+      scan.setReadType(Scan.ReadType.PREAD);
     }
-    int rows = Math.min(rowUpperLimit, scannerCaching);
-    scan.setCaching(rows);
+    scan.setCaching(scannerCaching);
     return scan;
   }
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 2c8669f..27e8cc4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -377,8 +377,8 @@ class AsyncNonMetaRegionLocator {
       metaKey = createRegionName(tableName, req.row, NINES, false);
     }
     conn.getRawTable(META_TABLE_NAME)
-        .scanAll(new Scan().withStartRow(metaKey).setReversed(true).setReadType(ReadType.PREAD)
-            .addFamily(CATALOG_FAMILY).setLimit(1))
+        .scanAll(new Scan().withStartRow(metaKey).setReversed(true).addFamily(CATALOG_FAMILY)
+            .setOneRowLimit())
         .whenComplete((results, error) -> onScanComplete(tableName, req, results, error));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index ea91100..313cb63 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -158,16 +160,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
 
     this.conf = conf;
     initCache();
-    initializeScannerInConstruction();
   }
 
   protected abstract void initCache();
 
-  protected void initializeScannerInConstruction() throws IOException {
-    // initialize the scanner
-    nextScanner(this.caching, false);
-  }
-
   protected ClusterConnection getConnection() {
     return this.connection;
   }
@@ -235,31 +231,30 @@ public abstract class ClientScanner extends AbstractClientScanner {
       this.callable = null;
     }
   }
-  /*
+
+  /**
    * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the
-   * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no
-   * further, just tidy up outstanding scanners, if <code>currentRegion != null</code> and
-   * <code>done</code> is true.
-   * @param nbRows
-   * @param done Server-side says we're done scanning.
+   * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow().
+   * @param nbRows the caching option of the scan
+   * @return the results fetched when open scanner, or null which means terminate the scan.
    */
-  protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
+  protected Result[] nextScanner(int nbRows) throws IOException {
     // Close the previous scanner if it's open
     closeScanner();
 
     // Where to start the next scanner
     byte[] localStartKey;
 
-    // if we're at end of table, close and return false to stop iterating
+    // if we're at end of table, close and return null to stop iterating
     if (this.currentRegion != null) {
       byte[] endKey = this.currentRegion.getEndKey();
-      if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
-          || checkScanStopRow(endKey) || done) {
+      if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
+          checkScanStopRow(endKey)) {
         close();
         if (LOG.isTraceEnabled()) {
           LOG.trace("Finished " + this.currentRegion);
         }
-        return false;
+        return null;
       }
       localStartKey = endKey;
       // clear mvcc read point if we are going to switch regions
@@ -280,16 +275,23 @@ public abstract class ClientScanner extends AbstractClientScanner {
       callable = getScannerCallable(localStartKey, nbRows);
       // Open a scanner on the region server starting at the
       // beginning of the region
-      call(callable, caller, scannerTimeout);
+      Result[] rrs = call(callable, caller, scannerTimeout);
       this.currentRegion = callable.getHRegionInfo();
       if (this.scanMetrics != null) {
         this.scanMetrics.countOfRegions.incrementAndGet();
       }
+      if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) {
+        // no results for the scan, return null to terminate the scan.
+        closed = true;
+        callable = null;
+        currentRegion = null;
+        return null;
+      }
+      return rrs;
     } catch (IOException e) {
-      close();
+      closeScanner();
       throw e;
     }
-    return true;
   }
 
   @VisibleForTesting
@@ -297,8 +299,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
     return callable.isAnyRPCcancelled();
   }
 
-  Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
-      int scannerTimeout) throws IOException, RuntimeException {
+  private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
+      int scannerTimeout) throws IOException {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
@@ -364,22 +366,19 @@ public abstract class ClientScanner extends AbstractClientScanner {
     return cache != null ? cache.size() : 0;
   }
 
+  private boolean scanExhausted(Result[] values) {
+    // This means the server tells us the whole scan operation is done. Usually decided by filter or
+    // limit.
+    return values == null || callable.moreResultsForScan() == MoreResults.NO;
+  }
+
   private boolean regionExhausted(Result[] values) {
-    // This means the server tells us the whole scan operation is done. Usually decided by filter.
-    if (values == null) {
-      return true;
-    }
-    // Not a heartbeat message and we get nothing, this means the region is exhausted
-    if (values.length == 0 && !callable.isHeartbeatMessage()) {
-      return true;
-    }
-    // Server tells us that it has no more results for this region. Notice that this flag is get
-    // from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter
-    // one is false then we will get a null values and quit in the first condition of this method.
-    if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) {
-      return true;
-    }
-    return false;
+    // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the
+    // old time we always return empty result for a open scanner operation so we add a check here to
+    // keep compatible with the old logic. Should remove the isOpenScanner in the future.
+    // 2. Server tells us that it has no more results for this region.
+    return (values.length == 0 && !callable.isHeartbeatMessage() && !callable.isOpenScanner())
+        || callable.moreResultsInRegion() == MoreResults.NO;
   }
 
   private void closeScannerIfExhausted(boolean exhausted) throws IOException {
@@ -395,18 +394,87 @@ public abstract class ClientScanner extends AbstractClientScanner {
     }
   }
 
+  private Result[] nextScannerWithRetries(int nbRows) throws IOException {
+    for (;;) {
+      try {
+        return nextScanner(nbRows);
+      } catch (DoNotRetryIOException e) {
+        handleScanError(e, null);
+      }
+    }
+  }
+
+  private void handleScanError(DoNotRetryIOException e,
+      MutableBoolean retryAfterOutOfOrderException) throws DoNotRetryIOException {
+    // An exception was thrown which makes any partial results that we were collecting
+    // invalid. The scanner will need to be reset to the beginning of a row.
+    clearPartialResults();
+    // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
+    // to reset the scanner and come back in again.
+
+    // If exception is any but the list below throw it back to the client; else setup
+    // the scanner and retry.
+    Throwable cause = e.getCause();
+    if ((cause != null && cause instanceof NotServingRegionException) ||
+        (cause != null && cause instanceof RegionServerStoppedException) ||
+        e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException ||
+        e instanceof ScannerResetException) {
+      // Pass. It is easier writing the if loop test as list of what is allowed rather than
+      // as a list of what is not allowed... so if in here, it means we do not throw.
+    } else {
+      throw e;
+    }
+
+    // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
+    if (this.lastResult != null) {
+      // The region has moved. We need to open a brand new scanner at the new location.
+      // Reset the startRow to the row we've seen last so that the new scanner starts at
+      // the correct row. Otherwise we may see previously returned rows again.
+      // (ScannerCallable by now has "relocated" the correct region)
+      if (!this.lastResult.isPartial() && scan.getBatch() < 0) {
+        if (scan.isReversed()) {
+          scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
+        } else {
+          scan.setStartRow(createClosestRowAfter(lastResult.getRow()));
+        }
+      } else {
+        // we need rescan this row because we only loaded partial row before
+        scan.setStartRow(lastResult.getRow());
+      }
+    }
+    if (e instanceof OutOfOrderScannerNextException) {
+      if (retryAfterOutOfOrderException != null) {
+        if (retryAfterOutOfOrderException.isTrue()) {
+          retryAfterOutOfOrderException.setValue(false);
+        } else {
+          // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
+          throw new DoNotRetryIOException(
+              "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e);
+        }
+      }
+    }
+    // Clear region.
+    this.currentRegion = null;
+    // Set this to zero so we don't try and do an rpc and close on remote server when
+    // the exception we got was UnknownScanner or the Server is going down.
+    callable = null;
+  }
+
   /**
    * Contact the servers to load more {@link Result}s in the cache.
    */
   protected void loadCache() throws IOException {
     // check if scanner was closed during previous prefetch
-    if (closed) return;
+    if (closed) {
+      return;
+    }
     Result[] values = null;
     long remainingResultSize = maxScannerResultSize;
     int countdown = this.caching;
     // This is possible if we just stopped at the boundary of a region in the previous call.
     if (callable == null) {
-      if (!nextScanner(countdown, false)) {
+      values = nextScannerWithRetries(countdown);
+      if (values == null) {
         return;
       }
     }
@@ -414,80 +482,39 @@ public abstract class ClientScanner extends AbstractClientScanner {
     callable.setCaching(this.caching);
     // This flag is set when we want to skip the result returned. We do
     // this when we reset scanner because it split under us.
-    boolean retryAfterOutOfOrderException = true;
+    MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
     for (;;) {
       try {
         // Server returns a null values if scanning is to stop. Else,
         // returns an empty array if scanning is to go on and we've just
         // exhausted current region.
-        values = call(callable, caller, scannerTimeout);
+        // now we will also fetch data when openScanner, so do not make a next call again if values
+        // is already non-null.
+        if (values == null) {
+          values = call(callable, caller, scannerTimeout);
+        }
         // When the replica switch happens, we need to do certain operations again.
         // The callable will openScanner with the right startkey but we need to pick up
         // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
         // of the loop as it happens for the cases where we see exceptions.
-        // Since only openScanner would have happened, values would be null
-        if (values == null && callable.switchedToADifferentReplica()) {
+        if (callable.switchedToADifferentReplica()) {
           // Any accumulated partial results are no longer valid since the callable will
           // openScanner with the correct startkey and we must pick up from there
           clearPartialResults();
           this.currentRegion = callable.getHRegionInfo();
-          continue;
-        }
-        retryAfterOutOfOrderException = true;
-      } catch (DoNotRetryIOException e) {
-        // An exception was thrown which makes any partial results that we were collecting
-        // invalid. The scanner will need to be reset to the beginning of a row.
-        clearPartialResults();
-        // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
-        // to reset the scanner and come back in again.
-
-        // If exception is any but the list below throw it back to the client; else setup
-        // the scanner and retry.
-        Throwable cause = e.getCause();
-        if ((cause != null && cause instanceof NotServingRegionException) ||
-            (cause != null && cause instanceof RegionServerStoppedException) ||
-            e instanceof OutOfOrderScannerNextException ||
-            e instanceof UnknownScannerException ||
-            e instanceof ScannerResetException) {
-          // Pass. It is easier writing the if loop test as list of what is allowed rather than
-          // as a list of what is not allowed... so if in here, it means we do not throw.
-        } else {
-          throw e;
-        }
-
-        // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
-        if (this.lastResult != null) {
-          // The region has moved. We need to open a brand new scanner at the new location.
-          // Reset the startRow to the row we've seen last so that the new scanner starts at
-          // the correct row. Otherwise we may see previously returned rows again.
-          // (ScannerCallable by now has "relocated" the correct region)
-          if (!this.lastResult.isPartial() && scan.getBatch() < 0) {
-            if (scan.isReversed()) {
-              scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
-            } else {
-              scan.setStartRow(createClosestRowAfter(lastResult.getRow()));
-            }
-          } else {
-            // we need rescan this row because we only loaded partial row before
-            scan.setStartRow(lastResult.getRow());
-          }
-        }
-        if (e instanceof OutOfOrderScannerNextException) {
-          if (retryAfterOutOfOrderException) {
-            retryAfterOutOfOrderException = false;
-          } else {
-            // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
-            throw new DoNotRetryIOException("Failed after retry of " +
-                "OutOfOrderScannerNextException: was there a rpc timeout?", e);
+          // Now we will also fetch data when openScanner so usually we should not get a null
+          // result, but at some places we still use null to indicate the scan is terminated, so add
+          // a sanity check here. Should be removed later.
+          if (values == null) {
+            continue;
           }
         }
-        // Clear region.
-        this.currentRegion = null;
-        // Set this to zero so we don't try and do an rpc and close on remote server when
-        // the exception we got was UnknownScanner or the Server is going down.
-        callable = null;
+        retryAfterOutOfOrderException.setValue(true);
+      } catch (DoNotRetryIOException e) {
+        handleScanError(e, retryAfterOutOfOrderException);
         // reopen the scanner
-        if (!nextScanner(countdown, false)) {
+        values = nextScannerWithRetries(countdown);
+        if (values == null) {
           break;
         }
         continue;
@@ -521,8 +548,18 @@ public abstract class ClientScanner extends AbstractClientScanner {
             this.lastCellLoadedToCache = null;
           }
         }
+        if (scan.getLimit() > 0) {
+          int limit = scan.getLimit() - resultsToAddToCache.size();
+          assert limit >= 0;
+          scan.setLimit(limit);
+        }
       }
-      boolean exhausted = regionExhausted(values);
+      if (scanExhausted(values)) {
+        closeScanner();
+        closed = true;
+        break;
+      }
+      boolean regionExhausted = regionExhausted(values);
       if (callable.isHeartbeatMessage()) {
         if (!cache.isEmpty()) {
           // Caller of this method just wants a Result. If we see a heartbeat message, it means
@@ -540,12 +577,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
       }
       if (countdown <= 0) {
         // we have enough result.
-        closeScannerIfExhausted(exhausted);
+        closeScannerIfExhausted(regionExhausted);
         break;
       }
       if (remainingResultSize <= 0) {
         if (!cache.isEmpty()) {
-          closeScannerIfExhausted(exhausted);
+          closeScannerIfExhausted(regionExhausted);
           break;
         } else {
           // we have reached the max result size but we still can not find anything to return to the
@@ -554,17 +591,21 @@ public abstract class ClientScanner extends AbstractClientScanner {
         }
       }
       // we are done with the current region
-      if (exhausted) {
+      if (regionExhausted) {
         if (!partialResults.isEmpty()) {
           // XXX: continue if there are partial results. But in fact server should not set
           // hasMoreResults to false if there are partial results.
           LOG.warn("Server tells us there is no more results for this region but we still have"
               + " partialResults, this should not happen, retry on the current scanner anyway");
+          values = null; // reset values for the next call
           continue;
         }
-        if (!nextScanner(countdown, values == null)) {
+        values = nextScannerWithRetries(countdown);
+        if (values == null) {
           break;
         }
+      } else {
+        values = null; // reset values for the next call
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
deleted file mode 100644
index 8f0c2f8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * <p>
- * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
- * scan results, unless the results cross multiple regions or the row count of
- * results exceed the caching.
- * </p>
- * For small scan, it will get better performance than {@link ReversedClientScanner}
- */
-@InterfaceAudience.Private
-public class ClientSmallReversedScanner extends ReversedClientScanner {
-  private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
-  private ScannerCallableWithReplicas smallReversedScanCallable = null;
-  private SmallReversedScannerCallableFactory callableFactory;
-
-  /**
-   * Create a new ReversibleClientScanner for the specified table. Take note that the passed
-   * {@link Scan} 's start row maybe changed changed.
-   *
-   * @param conf
-   *          The {@link Configuration} to use.
-   * @param scan
-   *          {@link Scan} to use in this scanner
-   * @param tableName
-   *          The table that we wish to rangeGet
-   * @param connection
-   *          Connection identifying the cluster
-   * @param rpcFactory
-   *          Factory used to create the {@link RpcRetryingCaller}
-   * @param controllerFactory
-   *          Factory used to access RPC payloads
-   * @param pool
-   *          Threadpool for RPC threads
-   * @param primaryOperationTimeout
-   *          Call timeout
-   * @throws IOException
-   *           If the remote call fails
-   */
-  public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
-      final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
-      throws IOException {
-    this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout, new SmallReversedScannerCallableFactory());
-  }
-
-  /**
-   * Create a new ReversibleClientScanner for the specified table. Take note that the passed
-   * {@link Scan}'s start row may be changed.
-   *
-   * @param conf
-   *          The {@link Configuration} to use.
-   * @param scan
-   *          {@link Scan} to use in this scanner
-   * @param tableName
-   *          The table that we wish to rangeGet
-   * @param connection
-   *          Connection identifying the cluster
-   * @param rpcFactory
-   *          Factory used to create the {@link RpcRetryingCaller}
-   * @param controllerFactory
-   *          Factory used to access RPC payloads
-   * @param pool
-   *          Threadpool for RPC threads
-   * @param primaryOperationTimeout
-   *          Call timeout
-   * @param callableFactory
-   *          Factory used to create the {@link SmallScannerCallable}
-   * @throws IOException
-   *           If the remote call fails
-   */
-  @VisibleForTesting
-  ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-      SmallReversedScannerCallableFactory callableFactory) throws IOException {
-    super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout);
-    this.callableFactory = callableFactory;
-  }
-
-  /**
-   * Gets a scanner for following scan. Move to next region or continue from the last result or
-   * start from the start row.
-   *
-   * @param nbRows
-   * @param done
-   *          true if Server-side says we're done scanning.
-   * @param currentRegionDone
-   *          true if scan is over on current region
-   * @return true if has next scanner
-   * @throws IOException
-   */
-  private boolean nextScanner(int nbRows, final boolean done,
-                              boolean currentRegionDone) throws IOException {
-    // Where to start the next getter
-    byte[] localStartKey;
-    int cacheNum = nbRows;
-    boolean regionChanged = true;
-    boolean isFirstRegionToLocate = false;
-    // if we're at end of table, close and return false to stop iterating
-    if (this.currentRegion != null && currentRegionDone) {
-      byte[] startKey = this.currentRegion.getStartKey();
-      if (startKey == null
-          || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
-          || checkScanStopRow(startKey) || done) {
-        close();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Finished with small scan at " + this.currentRegion);
-        }
-        return false;
-      }
-      // We take the row just under to get to the previous region.
-      localStartKey = createClosestRowBefore(startKey);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Finished with region " + this.currentRegion);
-      }
-    } else if (this.lastResult != null) {
-      regionChanged = false;
-      localStartKey = createClosestRowBefore(lastResult.getRow());
-    } else {
-      isFirstRegionToLocate = true;
-      localStartKey = this.scan.getStartRow();
-    }
-
-    if (!isFirstRegionToLocate
-        && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
-      // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan.
-      // otherwise, maybe infinity results with RowKey=0x00 will return.
-      return false;
-    }
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Advancing internal small scanner to startKey at '"
-          + Bytes.toStringBinary(localStartKey) + "'");
-    }
-
-    smallReversedScanCallable =
-        callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
-          localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
-          getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
-
-    if (this.scanMetrics != null && regionChanged) {
-      this.scanMetrics.countOfRegions.incrementAndGet();
-    }
-    return true;
-  }
-
-  @Override
-  public Result next() throws IOException {
-    // If the scanner is closed and there's nothing left in the cache, next is a
-    // no-op.
-    if (cache.isEmpty() && this.closed) {
-      return null;
-    }
-    if (cache.isEmpty()) {
-      loadCache();
-    }
-
-    if (cache.size() > 0) {
-      return cache.poll();
-    }
-    // if we exhausted this scanner before calling close, write out the scan
-    // metrics
-    writeScanMetrics();
-    return null;
-  }
-
-  @Override
-  protected void loadCache() throws IOException {
-    Result[] values = null;
-    long remainingResultSize = maxScannerResultSize;
-    int countdown = this.caching;
-    boolean currentRegionDone = false;
-    // Values == null means server-side filter has determined we must STOP
-    while (remainingResultSize > 0 && countdown > 0
-        && nextScanner(countdown, values == null, currentRegionDone)) {
-      // Server returns a null values if scanning is to stop. Else,
-      // returns an empty array if scanning is to go on and we've just
-      // exhausted current region.
-      // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
-      // we do a callWithRetries
-      values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
-      this.currentRegion = smallReversedScanCallable.getHRegionInfo();
-      long currentTime = System.currentTimeMillis();
-      if (this.scanMetrics != null) {
-        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
-            - lastNext);
-      }
-      lastNext = currentTime;
-      if (values != null && values.length > 0) {
-        for (int i = 0; i < values.length; i++) {
-          Result rs = values[i];
-          cache.add(rs);
-          // We don't make Iterator here
-          for (Cell cell : rs.rawCells()) {
-            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
-          }
-          countdown--;
-          this.lastResult = rs;
-        }
-      }
-      if (smallReversedScanCallable.hasMoreResultsContext()) {
-        currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
-      } else {
-        currentRegionDone = countdown > 0;
-      }
-    }
-  }
-
-  @Override
-  protected void initializeScannerInConstruction() throws IOException {
-    // No need to initialize the scanner when constructing instance, do it when
-    // calling next(). Do nothing here.
-  }
-
-  @Override
-  public void close() {
-    if (!scanMetricsPublished) writeScanMetrics();
-    closed = true;
-  }
-
-  /**
-   * A reversed ScannerCallable which supports backward small scanning.
-   */
-  static class SmallReversedScannerCallable extends ReversedScannerCallable {
-
-    public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
-        ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
-        int caching, int replicaId) {
-      super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
-      this.setCaching(caching);
-    }
-
-    @Override
-    protected Result[] rpcCall() throws Exception {
-      if (this.closed) return null;
-      if (Thread.interrupted()) {
-        throw new InterruptedIOException();
-      }
-      ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
-      ClientProtos.ScanResponse response = null;
-      response = getStub().scan(getRpcController(), request);
-      Result[] results = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
-      if (response.hasMoreResultsInRegion()) {
-        setHasMoreResultsContext(true);
-        setServerHasMoreResults(response.getMoreResultsInRegion());
-      } else {
-        setHasMoreResultsContext(false);
-      }
-      // We need to update result metrics since we are overriding call()
-      updateResultsMetrics(results);
-      return results;
-    }
-
-    @Override
-    public ScannerCallable getScannerCallableForReplica(int id) {
-      return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
-          scanMetrics, locateStartRow, rpcControllerFactory, getCaching(), id);
-    }
-  }
-
-  @VisibleForTesting
-  protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
-    this.callableFactory = callableFactory;
-  }
-
-  protected static class SmallReversedScannerCallableFactory {
-
-    public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
-        Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
-        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-        int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
-        boolean isFirstRegionToLocate) {
-      byte[] locateStartRow = null;
-      if (isFirstRegionToLocate
-          && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
-        // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to
-        // locate a region list, and the last one in region list is the region where our scan start.
-        locateStartRow = ConnectionUtils.MAX_BYTE_ARRAY;
-      }
-
-      scan.setStartRow(localStartKey);
-      SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
-          scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
-      ScannerCallableWithReplicas scannerCallableWithReplicas =
-          new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
-              retries, scannerTimeout, cacheNum, conf, caller);
-      return scannerCallableWithReplicas;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
deleted file mode 100644
index 52a291b..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Client scanner for small scan. Generally, only one RPC is called to fetch the
- * scan results, unless the results cross multiple regions or the row count of
- * results excess the caching.
- *
- * For small scan, it will get better performance than {@link ClientScanner}
- */
-@InterfaceAudience.Private
-public class ClientSmallScanner extends ClientSimpleScanner {
-  private static final Log LOG = LogFactory.getLog(ClientSmallScanner.class);
-  private ScannerCallableWithReplicas smallScanCallable = null;
-  private SmallScannerCallableFactory callableFactory;
-
-  /**
-   * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
-   * 's start row maybe changed changed.
-   *
-   * @param conf
-   *          The {@link Configuration} to use.
-   * @param scan
-   *          {@link Scan} to use in this scanner
-   * @param tableName
-   *          The table that we wish to rangeGet
-   * @param connection
-   *          Connection identifying the cluster
-   * @param rpcFactory
-   *          Factory used to create the {@link RpcRetryingCaller}
-   * @param controllerFactory
-   *          Factory used to access RPC payloads
-   * @param pool
-   *          Threadpool for RPC threads
-   * @param primaryOperationTimeout
-   *          Call timeout
-   * @throws IOException
-   *           If the remote call fails
-   */
-  public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
-      throws IOException {
-    this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout, new SmallScannerCallableFactory());
-  }
-
-  /**
-   * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
-   * 's start row maybe changed changed. Intended for unit tests to provide their own
-   * {@link SmallScannerCallableFactory} implementation/mock.
-   *
-   * @param conf
-   *          The {@link Configuration} to use.
-   * @param scan
-   *          {@link Scan} to use in this scanner
-   * @param tableName
-   *          The table that we wish to rangeGet
-   * @param connection
-   *          Connection identifying the cluster
-   * @param rpcFactory
-   *          Factory used to create the {@link RpcRetryingCaller}
-   * @param controllerFactory
-   *          Factory used to access RPC payloads
-   * @param pool
-   *          Threadpool for RPC threads
-   * @param primaryOperationTimeout
-   *          Call timeout
-   * @param callableFactory
-   *          Factory used to create the {@link SmallScannerCallable}
-   * @throws IOException
-   */
-  @VisibleForTesting
-  ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-      SmallScannerCallableFactory callableFactory) throws IOException {
-    super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout);
-    this.callableFactory = callableFactory;
-  }
-
-  @Override
-  protected void initializeScannerInConstruction() throws IOException {
-    // No need to initialize the scanner when constructing instance, do it when
-    // calling next(). Do nothing here.
-  }
-
-  /**
-   * Gets a scanner for following scan. Move to next region or continue from the
-   * last result or start from the start row.
-   * @param nbRows
-   * @param done true if Server-side says we're done scanning.
-   * @param currentRegionDone true if scan is over on current region
-   * @return true if has next scanner
-   * @throws IOException
-   */
-  private boolean nextScanner(int nbRows, final boolean done,
-      boolean currentRegionDone) throws IOException {
-    // Where to start the next getter
-    byte[] localStartKey;
-    int cacheNum = nbRows;
-    boolean regionChanged = true;
-    // if we're at end of table, close and return false to stop iterating
-    if (this.currentRegion != null && currentRegionDone) {
-      byte[] endKey = this.currentRegion.getEndKey();
-      if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
-          || checkScanStopRow(endKey) || done) {
-        close();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Finished with small scan at " + this.currentRegion);
-        }
-        return false;
-      }
-      localStartKey = endKey;
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Finished with region " + this.currentRegion);
-      }
-    } else if (this.lastResult != null) {
-      regionChanged = false;
-      localStartKey = Bytes.add(lastResult.getRow(), new byte[1]);
-    } else {
-      localStartKey = this.scan.getStartRow();
-    }
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Advancing internal small scanner to startKey at '"
-          + Bytes.toStringBinary(localStartKey) + "'");
-    }
-    smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
-        getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
-        getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
-    if (this.scanMetrics != null && regionChanged) {
-      this.scanMetrics.countOfRegions.incrementAndGet();
-    }
-    return true;
-  }
-
-  static class SmallScannerCallable extends ScannerCallable {
-    public SmallScannerCallable(
-        ClusterConnection connection, TableName table, Scan scan,
-        ScanMetrics scanMetrics, RpcControllerFactory controllerFactory, int caching, int id) {
-      super(connection, table, scan, scanMetrics, controllerFactory, id);
-      this.setCaching(caching);
-    }
-
-    @Override
-    protected Result[] rpcCall() throws Exception {
-      if (this.closed) return null;
-      if (Thread.interrupted()) {
-        throw new InterruptedIOException();
-      }
-      ScanRequest request = RequestConverter.buildScanRequest(getLocation()
-          .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
-      ScanResponse response = null;
-      response = getStub().scan(getRpcController(), request);
-      Result[] results = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
-      if (response.hasMoreResultsInRegion()) {
-        setHasMoreResultsContext(true);
-        setServerHasMoreResults(response.getMoreResultsInRegion());
-      } else {
-        setHasMoreResultsContext(false);
-      }
-      // We need to update result metrics since we are overriding call()
-      updateResultsMetrics(results);
-      return results;
-    }
-
-    @Override
-    public ScannerCallable getScannerCallableForReplica(int id) {
-      return new SmallScannerCallable((ClusterConnection)getConnection(), getTableName(), getScan(),
-          scanMetrics, rpcControllerFactory, getCaching(), id);
-    }
-  }
-
-  @Override
-  public Result next() throws IOException {
-    // If the scanner is closed and there's nothing left in the cache, next is a
-    // no-op.
-    if (cache.isEmpty() && this.closed) {
-      return null;
-    }
-    if (cache.isEmpty()) {
-      loadCache();
-    }
-
-    if (cache.size() > 0) {
-      return cache.poll();
-    }
-    // if we exhausted this scanner before calling close, write out the scan
-    // metrics
-    writeScanMetrics();
-    return null;
-  }
-
-  @Override
-  protected void loadCache() throws IOException {
-    Result[] values = null;
-    long remainingResultSize = maxScannerResultSize;
-    int countdown = this.caching;
-    boolean currentRegionDone = false;
-    // Values == null means server-side filter has determined we must STOP
-    while (remainingResultSize > 0 && countdown > 0
-        && nextScanner(countdown, values == null, currentRegionDone)) {
-      // Server returns a null values if scanning is to stop. Else,
-      // returns an empty array if scanning is to go on and we've just
-      // exhausted current region.
-      // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
-      // we do a callWithRetries
-      values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
-      this.currentRegion = smallScanCallable.getHRegionInfo();
-      long currentTime = System.currentTimeMillis();
-      if (this.scanMetrics != null) {
-        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
-            - lastNext);
-      }
-      lastNext = currentTime;
-      if (values != null && values.length > 0) {
-        for (int i = 0; i < values.length; i++) {
-          Result rs = values[i];
-          cache.add(rs);
-          // We don't make Iterator here
-          for (Cell cell : rs.rawCells()) {
-            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
-          }
-          countdown--;
-          this.lastResult = rs;
-        }
-      }
-      if (smallScanCallable.hasMoreResultsContext()) {
-        // If the server has more results, the current region is not done
-        currentRegionDone = !smallScanCallable.getServerHasMoreResults();
-      } else {
-        // not guaranteed to get the context in older versions, fall back to checking countdown
-        currentRegionDone = countdown > 0;
-      }
-    }
-  }
-
-  public void close() {
-    if (!scanMetricsPublished) writeScanMetrics();
-    closed = true;
-  }
-
-  @VisibleForTesting
-  protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
-    this.callableFactory = callableFactory;
-  }
-
-  @InterfaceAudience.Private
-  protected static class SmallScannerCallableFactory {
-
-    public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
-        Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
-        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-        int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) {
-      scan.setStartRow(localStartKey);
-      SmallScannerCallable s = new SmallScannerCallable(
-        connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
-      ScannerCallableWithReplicas scannerCallableWithReplicas =
-          new ScannerCallableWithReplicas(table, connection,
-              s, pool, primaryOperationTimeout, scan, retries,
-              scannerTimeout, cacheNum, conf, caller);
-      return scannerCallableWithReplicas;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 9142ec9..f0b755f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -789,8 +789,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     s.setReversed(true);
     s.setStartRow(metaKey);
     s.addFamily(HConstants.CATALOG_FAMILY);
-    s.setSmall(true);
-    s.setCaching(1);
+    s.setOneRowLimit();
     if (this.useMetaReplicas) {
       s.setConsistency(Consistency.TIMELINE);
     }
@@ -818,21 +817,16 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       long pauseBase = this.pause;
       try {
         Result regionInfoRow = null;
-        ReversedClientScanner rcs = null;
-        try {
-          rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
-            rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
+        s.resetMvccReadPoint();
+        try (ReversedClientScanner rcs =
+            new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
+                rpcControllerFactory, getMetaLookupPool(), 0)) {
           regionInfoRow = rcs.next();
-        } finally {
-          if (rcs != null) {
-            rcs.close();
-          }
         }
 
         if (regionInfoRow == null) {
           throw new TableNotFoundException(tableName);
         }
-
         // convert the row result into the HRegionLocation we need!
         RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
         if (locations == null || locations.getRegionLocation(replicaId) == null) {
@@ -886,7 +880,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
         throw e;
       } catch (IOException e) {
         ExceptionUtil.rethrowIfInterrupt(e);
-
         if (e instanceof RemoteException) {
           e = ((RemoteException)e).unwrapRemoteException();
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 72d71eb..1fa33b0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -350,21 +350,9 @@ public class HTable implements Table {
     }
 
     if (scan.isReversed()) {
-      if (scan.isSmall()) {
-        return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
-            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
-            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
-      } else {
-        return new ReversedClientScanner(getConfiguration(), scan, getName(),
-            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
-            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
-      }
-    }
-
-    if (scan.isSmall()) {
-      return new ClientSmallScanner(getConfiguration(), scan, getName(),
-          this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
-          pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
+      return new ReversedClientScanner(getConfiguration(), scan, getName(),
+        this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
+        pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
     } else {
       if (async) {
         return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index e1a522a..c308dd4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -60,8 +61,7 @@ public class ReversedClientScanner extends ClientSimpleScanner {
   }
 
   @Override
-  protected boolean nextScanner(int nbRows, final boolean done)
-      throws IOException {
+  protected Result[] nextScanner(int nbRows) throws IOException {
     // Close the previous scanner if it's open
     closeScanner();
 
@@ -71,16 +71,17 @@ public class ReversedClientScanner extends ClientSimpleScanner {
     // if we're at start of table, close and return false to stop iterating
     if (this.currentRegion != null) {
       byte[] startKey = this.currentRegion.getStartKey();
-      if (startKey == null
-          || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
-          || checkScanStopRow(startKey) || done) {
+      if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
+          || checkScanStopRow(startKey)) {
         close();
         if (LOG.isDebugEnabled()) {
           LOG.debug("Finished " + this.currentRegion);
         }
-        return false;
+        return null;
       }
       localStartKey = startKey;
+      // clear mvcc read point if we are going to switch regions
+      scan.resetMvccReadPoint();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Finished " + this.currentRegion);
       }
@@ -111,17 +112,21 @@ public class ReversedClientScanner extends ClientSimpleScanner {
       // beginning of the region
       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
       // we do a callWithRetries
-      this.caller.callWithoutRetries(callable, scannerTimeout);
+      Result[] rrs = this.caller.callWithoutRetries(callable, scannerTimeout);
       this.currentRegion = callable.getHRegionInfo();
       if (this.scanMetrics != null) {
         this.scanMetrics.countOfRegions.incrementAndGet();
       }
+      if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) {
+        // no results for the scan, return null to terminate the scan.
+        return null;
+      }
+      return rrs;
     } catch (IOException e) {
       ExceptionUtil.rethrowIfInterrupt(e);
       close();
       throw e;
     }
-    return true;
   }
 
   protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index c7d78c6..195bcba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -42,7 +42,7 @@ public class ReversedScannerCallable extends ScannerCallable {
   /**
    * The start row for locating regions. In reversed scanner, may locate the
    * regions for a range of keys when doing
-   * {@link ReversedClientScanner#nextScanner(int, boolean)}
+   * {@link ReversedClientScanner#nextScanner(int)}
    */
   protected final byte[] locateStartRow;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 31e76da..c4b7044 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -1119,6 +1119,15 @@ public class Scan extends Query {
     return this;
   }
 
+  /**
+   * Call this when you only want to get one row. It will set {@code limit} to {@code 1}, and also
+   * set {@code readType} to {@link ReadType#PREAD}.
+   * @return this
+   */
+  public Scan setOneRowLimit() {
+    return setLimit(1).setReadType(ReadType.PREAD);
+  }
+
   @InterfaceAudience.Public
   @InterfaceStability.Unstable
   public enum ReadType {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index f867acb..3ef68ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -42,13 +42,12 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
 /**
@@ -75,9 +74,15 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
   private int logCutOffLatency = 1000;
   private static String myAddress;
   protected final int id;
-  protected boolean serverHasMoreResultsContext;
-  protected boolean serverHasMoreResults;
 
+  enum MoreResults {
+    YES, NO, UNKNOWN
+  }
+
+  private MoreResults moreResultsInRegion;
+  private MoreResults moreResultsForScan;
+
+  private boolean openScanner;
   /**
    * Saves whether or not the most recent response from the server was a heartbeat message.
    * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
@@ -174,120 +179,121 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
     }
   }
 
+  private ScanResponse next() throws IOException {
+    // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
+    setHeartbeatMessage(false);
+    incRPCcallsMetrics();
+    ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
+      this.scanMetrics != null, renew, scan.getLimit());
+    try {
+      ScanResponse response = getStub().scan(getRpcController(), request);
+      nextCallSeq++;
+      return response;
+    } catch (Exception e) {
+      IOException ioe = ProtobufUtil.handleRemoteException(e);
+      if (logScannerActivity) {
+        LOG.info(
+          "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(),
+          e);
+      }
+      if (logScannerActivity) {
+        if (ioe instanceof UnknownScannerException) {
+          try {
+            HRegionLocation location =
+                getConnection().relocateRegion(getTableName(), scan.getStartRow());
+            LOG.info("Scanner=" + scannerId + " expired, current region location is "
+                + location.toString());
+          } catch (Throwable t) {
+            LOG.info("Failed to relocate region", t);
+          }
+        } else if (ioe instanceof ScannerResetException) {
+          LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
+              + "asked us to reset the scanner state.",
+            ioe);
+        }
+      }
+      // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
+      // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want
+      // ServerCallable#withRetries to just retry when it gets these exceptions. In here in
+      // a scan when doing a next in particular, we want to break out and get the scanner to
+      // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly,
+      // yeah and hard to follow and in need of a refactor).
+      if (ioe instanceof NotServingRegionException) {
+        // Throw a DNRE so that we break out of cycle of calling NSRE
+        // when what we need is to open scanner against new location.
+        // Attach NSRE to signal client that it needs to re-setup scanner.
+        if (this.scanMetrics != null) {
+          this.scanMetrics.countOfNSRE.incrementAndGet();
+        }
+        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
+      } else if (ioe instanceof RegionServerStoppedException) {
+        // Throw a DNRE so that we break out of cycle of the retries and instead go and
+        // open scanner against new location.
+        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
+      } else {
+        // The outer layers will retry
+        throw ioe;
+      }
+    }
+  }
+
+  private void setAlreadyClosed() {
+    this.scannerId = -1L;
+    this.closed = true;
+  }
+
   @Override
-  protected Result [] rpcCall() throws Exception {
+  protected Result[] rpcCall() throws Exception {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-    if (this.closed) {
-      if (this.scannerId != -1) {
-        close();
+    if (closed) {
+      close();
+      return null;
+    }
+    ScanResponse response;
+    if (this.scannerId == -1L) {
+      this.openScanner = true;
+      response = openScanner();
+    } else {
+      this.openScanner = false;
+      response = next();
+    }
+    long timestamp = System.currentTimeMillis();
+    setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
+    Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
+    if (logScannerActivity) {
+      long now = System.currentTimeMillis();
+      if (now - timestamp > logCutOffLatency) {
+        int rows = rrs == null ? 0 : rrs.length;
+        LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner="
+            + scannerId);
+      }
+    }
+    updateServerSideMetrics(response);
+    // moreResults is only used for the case where a filter exhausts all elements
+    if (response.hasMoreResults()) {
+      if (response.getMoreResults()) {
+        setMoreResultsForScan(MoreResults.YES);
+      } else {
+        setMoreResultsForScan(MoreResults.NO);
+        setAlreadyClosed();
       }
     } else {
-      if (this.scannerId == -1L) {
-        this.scannerId = openScanner();
+      setMoreResultsForScan(MoreResults.UNKNOWN);
+    }
+    if (response.hasMoreResultsInRegion()) {
+      if (response.getMoreResultsInRegion()) {
+        setMoreResultsInRegion(MoreResults.YES);
       } else {
-        Result [] rrs = null;
-        ScanRequest request = null;
-        // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
-        setHeartbeatMessage(false);
-        try {
-          incRPCcallsMetrics();
-          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
-            this.scanMetrics != null, renew, -1);
-          ScanResponse response = null;
-          response = getStub().scan(getRpcController(), request);
-          // Client and RS maintain a nextCallSeq number during the scan. Every next() call
-          // from client to server will increment this number in both sides. Client passes this
-          // number along with the request and at RS side both the incoming nextCallSeq and its
-          // nextCallSeq will be matched. In case of a timeout this increment at the client side
-          // should not happen. If at the server side fetching of next batch of data was over,
-          // there will be mismatch in the nextCallSeq number. Server will throw
-          // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
-          // as the last successfully retrieved row.
-          // See HBASE-5974
-          nextCallSeq++;
-          long timestamp = System.currentTimeMillis();
-          setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
-          rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
-          if (logScannerActivity) {
-            long now = System.currentTimeMillis();
-            if (now - timestamp > logCutOffLatency) {
-              int rows = rrs == null ? 0 : rrs.length;
-              LOG.info("Took " + (now-timestamp) + "ms to fetch "
-                  + rows + " rows from scanner=" + scannerId);
-            }
-          }
-          updateServerSideMetrics(response);
-          // moreResults is only used for the case where a filter exhausts all elements
-          if (response.hasMoreResults() && !response.getMoreResults()) {
-            this.scannerId = -1L;
-            this.closed = true;
-            // Implied that no results were returned back, either.
-            return null;
-          }
-          // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
-          // to size or quantity of results in the response.
-          if (response.hasMoreResultsInRegion()) {
-            // Set what the RS said
-            setHasMoreResultsContext(true);
-            setServerHasMoreResults(response.getMoreResultsInRegion());
-          } else {
-            // Server didn't respond whether it has more results or not.
-            setHasMoreResultsContext(false);
-          }
-          updateResultsMetrics(rrs);
-        } catch (IOException e) {
-          if (logScannerActivity) {
-            LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " +
-                getLocation(), e);
-          }
-          IOException ioe = e;
-          if (e instanceof RemoteException) {
-            ioe = ((RemoteException) e).unwrapRemoteException();
-          }
-          if (logScannerActivity) {
-            if (ioe instanceof UnknownScannerException) {
-              try {
-                HRegionLocation location =
-                    getConnection().relocateRegion(getTableName(), scan.getStartRow());
-                LOG.info("Scanner=" + scannerId + " expired, current region location is " +
-                    location.toString());
-              } catch (Throwable t) {
-                LOG.info("Failed to relocate region", t);
-              }
-            } else if (ioe instanceof ScannerResetException) {
-              LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
-                  + "asked us to reset the scanner state.", ioe);
-            }
-          }
-          // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
-          // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
-          // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
-          // a scan when doing a next in particular, we want to break out and get the scanner to
-          // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
-          // yeah and hard to follow and in need of a refactor).
-          if (ioe instanceof NotServingRegionException) {
-            // Throw a DNRE so that we break out of cycle of calling NSRE
-            // when what we need is to open scanner against new location.
-            // Attach NSRE to signal client that it needs to re-setup scanner.
-            if (this.scanMetrics != null) {
-              this.scanMetrics.countOfNSRE.incrementAndGet();
-            }
-            throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
-          } else if (ioe instanceof RegionServerStoppedException) {
-            // Throw a DNRE so that we break out of cycle of the retries and instead go and
-            // open scanner against new location.
-            throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
-          } else {
-            // The outer layers will retry
-            throw ioe;
-          }
-        }
-        return rrs;
+        setMoreResultsInRegion(MoreResults.NO);
+        setAlreadyClosed();
       }
+    } else {
+      setMoreResultsInRegion(MoreResults.UNKNOWN);
     }
-    return null;
+    updateResultsMetrics(rrs);
+    return rrs;
   }
 
   /**
@@ -296,11 +302,11 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
    *         timeouts during long running scan operations.
    */
-  protected boolean isHeartbeatMessage() {
+  boolean isHeartbeatMessage() {
     return heartbeatMessage;
   }
 
-  protected void setHeartbeatMessage(boolean heartbeatMessage) {
+  private void setHeartbeatMessage(boolean heartbeatMessage) {
     this.heartbeatMessage = heartbeatMessage;
   }
 
@@ -367,10 +373,10 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
     this.scannerId = -1L;
   }
 
-  protected long openScanner() throws IOException {
+  private ScanResponse openScanner() throws IOException {
     incRPCcallsMetrics();
     ScanRequest request = RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
+      getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
     try {
       ScanResponse response = getStub().scan(getRpcController(), request);
       long id = response.getScannerId();
@@ -381,7 +387,8 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
       if (response.hasMvccReadPoint()) {
         this.scan.setMvccReadPoint(response.getMvccReadPoint());
       }
-      return id;
+      this.scannerId = id;
+      return response;
     } catch (Exception e) {
       throw ProtobufUtil.handleRemoteException(e);
     }
@@ -443,27 +450,31 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
 
   /**
    * Should the client attempt to fetch more results from this region
-   * @return True if the client should attempt to fetch more results, false otherwise.
    */
-  protected boolean getServerHasMoreResults() {
-    assert serverHasMoreResultsContext;
-    return this.serverHasMoreResults;
+  MoreResults moreResultsInRegion() {
+    return moreResultsInRegion;
   }
 
-  protected void setServerHasMoreResults(boolean serverHasMoreResults) {
-    this.serverHasMoreResults = serverHasMoreResults;
+  void setMoreResultsInRegion(MoreResults moreResults) {
+    this.moreResultsInRegion = moreResults;
   }
 
   /**
-   * Did the server respond with information about whether more results might exist.
-   * Not guaranteed to respond with older server versions
-   * @return True if the server responded with information about more results.
+   * Should the client attempt to fetch more results for the whole scan.
    */
-  protected boolean hasMoreResultsContext() {
-    return serverHasMoreResultsContext;
+  MoreResults moreResultsForScan() {
+    return moreResultsForScan;
+  }
+
+  void setMoreResultsForScan(MoreResults moreResults) {
+    this.moreResultsForScan = moreResults;
   }
 
-  protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
-    this.serverHasMoreResultsContext = serverHasMoreResultsContext;
+  /**
+   * Whether the previous call is openScanner. This is used to keep compatible with the old
+   * implementation that we always returns empty result for openScanner.
+   */
+  boolean isOpenScanner() {
+    return openScanner;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index e04fd6e..c99fe9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -113,20 +114,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     return currentScannerCallable.getHRegionInfo();
   }
 
-  public boolean getServerHasMoreResults() {
-    return currentScannerCallable.getServerHasMoreResults();
+  public MoreResults moreResultsInRegion() {
+    return currentScannerCallable.moreResultsInRegion();
   }
 
-  public void setServerHasMoreResults(boolean serverHasMoreResults) {
-    currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
+  public MoreResults moreResultsForScan() {
+    return currentScannerCallable.moreResultsForScan();
   }
 
-  public boolean hasMoreResultsContext() {
-    return currentScannerCallable.hasMoreResultsContext();
-  }
-
-  public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
-    currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
+  public boolean isOpenScanner() {
+    return currentScannerCallable.isOpenScanner();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index 4319b9a..cf0e995 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.After;
@@ -86,6 +89,7 @@ public class TestClientScanner {
 
     private boolean rpcFinished = false;
     private boolean rpcFinishedFired = false;
+    private boolean initialized = false;
 
     public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
         ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
@@ -96,9 +100,13 @@ public class TestClientScanner {
     }
 
     @Override
-    protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
+    protected Result[] nextScanner(int nbRows) throws IOException {
+      if (!initialized) {
+        initialized = true;
+        return super.nextScanner(nbRows);
+      }
       if (!rpcFinished) {
-        return super.nextScanner(nbRows, done);
+        return super.nextScanner(nbRows);
       }
 
       // Enforce that we don't short-circuit more than once
@@ -107,7 +115,7 @@ public class TestClientScanner {
             " short-circuit was triggered.");
       }
       rpcFinishedFired = true;
-      return false;
+      return null;
     }
 
     @Override
@@ -158,14 +166,13 @@ public class TestClientScanner {
                 ScannerCallableWithReplicas.class);
           switch (count) {
             case 0: // initialize
-            case 2: // detect no more results
-            case 3: // close
-              count++;
-              return null;
-            case 1:
               count++;
-              callable.setHasMoreResultsContext(false);
+              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.UNKNOWN);
               return results;
+            case 1: // detect no more results
+            case 2: // close
+              count++;
+              return null;
             default:
               throw new RuntimeException("Expected only 2 invocations");
           }
@@ -221,15 +228,13 @@ public class TestClientScanner {
               ScannerCallableWithReplicas.class);
           switch (count) {
             case 0: // initialize
-            case 2: // close
-              count++;
-              return null;
-            case 1:
               count++;
-              callable.setHasMoreResultsContext(true);
-              // if we set false here the implementation will trigger a close
-              callable.setServerHasMoreResults(true);
+              // if we set no here the implementation will trigger a close
+              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
               return results;
+            case 1: // close
+              count++;
+              return null;
             default:
               throw new RuntimeException("Expected only 2 invocations");
           }
@@ -245,16 +250,11 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
-
-      // Due to initializeScannerInConstruction()
-      Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
-          Mockito.anyInt());
-
       InOrder inOrder = Mockito.inOrder(caller);
 
       scanner.loadCache();
 
-      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
+      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(
           Mockito.any(RetryingCallable.class), Mockito.anyInt());
 
       assertEquals(1, scanner.cache.size());
@@ -289,15 +289,13 @@ public class TestClientScanner {
               ScannerCallableWithReplicas.class);
           switch (count) {
             case 0: // initialize
-            case 2: // close
-              count++;
-              return null;
-            case 1:
               count++;
-              callable.setHasMoreResultsContext(true);
-              // if we set false here the implementation will trigger a close
-              callable.setServerHasMoreResults(true);
+              // if we set no here the implementation will trigger a close
+              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
               return results;
+            case 1: // close
+              count++;
+              return null;
             default:
               throw new RuntimeException("Expected only 2 invocations");
           }
@@ -313,18 +311,11 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
-
-      // Due to initializeScannerInConstruction()
-      Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
-          Mockito.anyInt());
-
       InOrder inOrder = Mockito.inOrder(caller);
 
       scanner.loadCache();
 
-      // Ensures that possiblyNextScanner isn't called at the end which would trigger
-      // another call to callWithoutRetries
-      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
+      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(
           Mockito.any(RetryingCallable.class), Mockito.anyInt());
 
       assertEquals(3, scanner.cache.size());
@@ -371,14 +362,12 @@ public class TestClientScanner {
               ScannerCallableWithReplicas.class);
           switch (count) {
             case 0: // initialize
-            case 2: // close
-              count++;
-              return null;
-            case 1:
               count++;
-              callable.setHasMoreResultsContext(true);
-              callable.setServerHasMoreResults(false);
+              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO);
               return results;
+            case 1: // close
+              count++;
+              return null;
             default:
               throw new RuntimeException("Expected only 2 invocations");
           }
@@ -393,18 +382,13 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
-
-      // Due to initializeScannerInConstruction()
-      Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
-          Mockito.anyInt());
-
       scanner.setRpcFinished(true);
 
       InOrder inOrder = Mockito.inOrder(caller);
 
       scanner.loadCache();
 
-      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
+      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(
           Mockito.any(RetryingCallable.class), Mockito.anyInt());
 
       assertEquals(1, scanner.cache.size());
@@ -443,22 +427,19 @@ public class TestClientScanner {
                 ScannerCallableWithReplicas.class);
             switch (count) {
               case 0: // initialize
-              case 3: // close
-                count++;
-                return null;
-              case 1:
                 count++;
-                callable.setHasMoreResultsContext(true);
-                callable.setServerHasMoreResults(true);
+                callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
                 return results1;
-              case 2:
+              case 1:
                 count++;
                 // The server reports back false WRT more results
-                callable.setHasMoreResultsContext(true);
-                callable.setServerHasMoreResults(false);
+                callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO);
                 return results2;
+              case 2: // close
+                count++;
+                return null;
               default:
-                throw new RuntimeException("Expected only 2 invocations");
+                throw new RuntimeException("Expected only 3 invocations");
             }
           }
       });
@@ -469,17 +450,12 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
-
-      // Due to initializeScannerInConstruction()
-      Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
-          Mockito.anyInt());
-
       InOrder inOrder = Mockito.inOrder(caller);
       scanner.setRpcFinished(true);
 
       scanner.loadCache();
 
-      inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
+      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
           Mockito.any(RetryingCallable.class), Mockito.anyInt());
 
       assertEquals(2, scanner.cache.size());
@@ -524,8 +500,8 @@ public class TestClientScanner {
         iter.next();
       }
       fail("Should have failed with RetriesExhaustedException");
-    } catch (RetriesExhaustedException expected) {
-
+    } catch (RuntimeException expected) {
+      assertThat(expected.getCause(), instanceOf(RetriesExhaustedException.class));
     }
   }
 
@@ -560,7 +536,5 @@ public class TestClientScanner {
         }
       };
     }
-
   }
-
 }


Mime
View raw message