hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [3/3] hbase git commit: HBASE-17508 Unify the implementation of small scan and regular scan for sync client
Date Sun, 05 Feb 2017 00:59:52 GMT
HBASE-17508 Unify the implementation of small scan and regular scan for sync client


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4456d228
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4456d228
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4456d228

Branch: refs/heads/branch-1
Commit: 4456d22859d7e1821f0b8b9cf3acdb8564b3cd09
Parents: fb12397
Author: zhangduo <zhangduo@apache.org>
Authored: Sat Feb 4 21:14:31 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Sun Feb 5 08:49:51 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      | 305 +++++----
 .../client/ClientSmallReversedScanner.java      | 345 ----------
 .../hadoop/hbase/client/ClientSmallScanner.java | 317 ---------
 .../hadoop/hbase/client/ConnectionManager.java  |  15 +-
 .../hadoop/hbase/client/ConnectionUtils.java    |  39 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |  18 +-
 .../hbase/client/ReversedClientScanner.java     |  25 +-
 .../hbase/client/ReversedScannerCallable.java   |   2 +-
 .../org/apache/hadoop/hbase/client/Scan.java    |  68 ++
 .../hadoop/hbase/client/ScannerCallable.java    | 296 +++++----
 .../client/ScannerCallableWithReplicas.java     |  31 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  96 ++-
 .../hadoop/hbase/protobuf/RequestConverter.java |   8 +-
 .../hadoop/hbase/client/TestClientScanner.java  | 112 ++--
 .../client/TestClientSmallReversedScanner.java  | 349 ----------
 .../hbase/client/TestClientSmallScanner.java    | 339 ----------
 .../hbase/protobuf/generated/ClientProtos.java  | 655 ++++++++++++++++---
 hbase-protocol/src/main/protobuf/Client.proto   |  11 +
 .../hadoop/hbase/mapreduce/SyncTable.java       | 165 +++--
 .../hbase/regionserver/RSRpcServices.java       |  46 +-
 .../org/apache/hadoop/hbase/tool/Canary.java    |   5 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |  25 +-
 .../hbase/TestMetaTableAccessorNoCluster.java   |   2 +-
 .../hbase/TestPartialResultsFromClientSide.java |  12 +-
 .../client/TestClientScannerRPCTimeout.java     |   5 +
 .../hadoop/hbase/client/TestFromClientSide.java |   2 +-
 .../hadoop/hbase/client/TestLeaseRenewal.java   |   4 +-
 .../regionserver/TestRegionServerMetrics.java   |   2 +-
 .../regionserver/TestScannerWithBulkload.java   |   6 +-
 .../security/access/TestAccessController.java   |   1 -
 .../security/access/TestAccessController2.java  |   3 +
 31 files changed, 1308 insertions(+), 2001 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index fb2bc4b..40b5002 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
@@ -27,6 +28,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -57,9 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Private
 public class ClientScanner extends AbstractClientScanner {
   private static final Log LOG = LogFactory.getLog(ClientScanner.class);
-  // A byte array in which all elements are the max byte, and it is used to
-  // construct closest front row
-  static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
+
   protected Scan scan;
   protected boolean closed = false;
   // Current region scanner is against. Gets cleared if current region goes
@@ -156,12 +157,6 @@ public class ClientScanner extends AbstractClientScanner {
     this.rpcControllerFactory = controllerFactory;
 
     this.conf = conf;
-    initializeScannerInConstruction();
-  }
-
-  protected void initializeScannerInConstruction() throws IOException {
-    // initialize the scanner
-    nextScanner(this.caching, false);
   }
 
   protected ClusterConnection getConnection() {
@@ -242,31 +237,30 @@ public class ClientScanner extends AbstractClientScanner {
       this.callable = null;
     }
   }
-  /*
+
+  /**
    * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the
-   * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no
-   * further, just tidy up outstanding scanners, if <code>currentRegion != null</code> and
-   * <code>done</code> is true.
-   * @param nbRows
-   * @param done Server-side says we're done scanning.
+   * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow().
+   * @param nbRows the caching option of the scan
+   * @return the results fetched when open scanner, or null which means terminate the scan.
    */
-  protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
+  protected Result[] nextScanner(int nbRows) throws IOException {
     // Close the previous scanner if it's open
     closeScanner();
 
     // Where to start the next scanner
     byte[] localStartKey;
 
-    // if we're at end of table, close and return false to stop iterating
+    // if we're at end of table, close and return null to stop iterating
     if (this.currentRegion != null) {
       byte[] endKey = this.currentRegion.getEndKey();
-      if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
-          || checkScanStopRow(endKey) || done) {
+      if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
+          checkScanStopRow(endKey)) {
         close();
         if (LOG.isTraceEnabled()) {
           LOG.trace("Finished " + this.currentRegion);
         }
-        return false;
+        return null;
       }
       localStartKey = endKey;
       // clear mvcc read point if we are going to switch regions
@@ -287,16 +281,23 @@ public class ClientScanner extends AbstractClientScanner {
       callable = getScannerCallable(localStartKey, nbRows);
       // Open a scanner on the region server starting at the
       // beginning of the region
-      call(callable, caller, scannerTimeout);
+      Result[] rrs = call(callable, caller, scannerTimeout);
       this.currentRegion = callable.getHRegionInfo();
       if (this.scanMetrics != null) {
         this.scanMetrics.countOfRegions.incrementAndGet();
       }
+      if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) {
+        // no results for the scan, return null to terminate the scan.
+        closed = true;
+        callable = null;
+        currentRegion = null;
+        return null;
+      }
+      return rrs;
     } catch (IOException e) {
-      close();
+      closeScanner();
       throw e;
     }
-    return true;
   }
 
   @VisibleForTesting
@@ -304,9 +305,8 @@ public class ClientScanner extends AbstractClientScanner {
     return callable.isAnyRPCcancelled();
   }
 
-  Result[] call(ScannerCallableWithReplicas callable,
-      RpcRetryingCaller<Result[]> caller, int scannerTimeout)
-      throws IOException, RuntimeException {
+  private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
+      int scannerTimeout) throws IOException {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
@@ -369,22 +369,19 @@ public class ClientScanner extends AbstractClientScanner {
     return cache != null ? cache.size() : 0;
   }
 
+  private boolean scanExhausted(Result[] values) {
+    // This means the server tells us the whole scan operation is done. Usually decided by filter or
+    // limit.
+    return values == null || callable.moreResultsForScan() == MoreResults.NO;
+  }
+
   private boolean regionExhausted(Result[] values) {
-    // This means the server tells us the whole scan operation is done. Usually decided by filter.
-    if (values == null) {
-      return true;
-    }
-    // Not a heartbeat message and we get nothing, this means the region is exhausted
-    if (values.length == 0 && !callable.isHeartbeatMessage()) {
-      return true;
-    }
-    // Server tells us that it has no more results for this region. Notice that this flag is get
-    // from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter
-    // one is false then we will get a null values and quit in the first condition of this method.
-    if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) {
-      return true;
-    }
-    return false;
+    // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the
+    // old time we always return empty result for a open scanner operation so we add a check here to
+    // keep compatible with the old logic. Should remove the isOpenScanner in the future.
+    // 2. Server tells us that it has no more results for this region.
+    return (values.length == 0 && !callable.isHeartbeatMessage() && !callable.isOpenScanner())
+        || callable.moreResultsInRegion() == MoreResults.NO;
   }
 
   private void closeScannerIfExhausted(boolean exhausted) throws IOException {
@@ -400,16 +397,87 @@ public class ClientScanner extends AbstractClientScanner {
     }
   }
 
+  private Result[] nextScannerWithRetries(int nbRows) throws IOException {
+    for (;;) {
+      try {
+        return nextScanner(nbRows);
+      } catch (DoNotRetryIOException e) {
+        handleScanError(e, null);
+      }
+    }
+  }
+
+  private void handleScanError(DoNotRetryIOException e,
+      MutableBoolean retryAfterOutOfOrderException) throws DoNotRetryIOException {
+    // An exception was thrown which makes any partial results that we were collecting
+    // invalid. The scanner will need to be reset to the beginning of a row.
+    clearPartialResults();
+    // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
+    // to reset the scanner and come back in again.
+
+    // If exception is any but the list below throw it back to the client; else setup
+    // the scanner and retry.
+    Throwable cause = e.getCause();
+    if ((cause != null && cause instanceof NotServingRegionException) ||
+        (cause != null && cause instanceof RegionServerStoppedException) ||
+        e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException ||
+        e instanceof ScannerResetException) {
+      // Pass. It is easier writing the if loop test as list of what is allowed rather than
+      // as a list of what is not allowed... so if in here, it means we do not throw.
+    } else {
+      throw e;
+    }
+
+    // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
+    if (this.lastResult != null) {
+      // The region has moved. We need to open a brand new scanner at the new location.
+      // Reset the startRow to the row we've seen last so that the new scanner starts at
+      // the correct row. Otherwise we may see previously returned rows again.
+      // (ScannerCallable by now has "relocated" the correct region)
+      if (!this.lastResult.isPartial() && scan.getBatch() < 0) {
+        if (scan.isReversed()) {
+          scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
+        } else {
+          scan.setStartRow(createClosestRowAfter(lastResult.getRow()));
+        }
+      } else {
+        // we need rescan this row because we only loaded partial row before
+        scan.setStartRow(lastResult.getRow());
+      }
+    }
+    if (e instanceof OutOfOrderScannerNextException) {
+      if (retryAfterOutOfOrderException != null) {
+        if (retryAfterOutOfOrderException.isTrue()) {
+          retryAfterOutOfOrderException.setValue(false);
+        } else {
+          // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
+          throw new DoNotRetryIOException(
+              "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e);
+        }
+      }
+    }
+    // Clear region.
+    this.currentRegion = null;
+    // Set this to zero so we don't try and do an rpc and close on remote server when
+    // the exception we got was UnknownScanner or the Server is going down.
+    callable = null;
+  }
+
   /**
    * Contact the servers to load more {@link Result}s in the cache.
    */
   protected void loadCache() throws IOException {
+    // check if scanner was closed during previous prefetch
+    if (closed) {
+      return;
+    }
     Result[] values = null;
     long remainingResultSize = maxScannerResultSize;
     int countdown = this.caching;
     // This is possible if we just stopped at the boundary of a region in the previous call.
     if (callable == null) {
-      if (!nextScanner(countdown, false)) {
+      values = nextScannerWithRetries(countdown);
+      if (values == null) {
         return;
       }
     }
@@ -417,80 +485,39 @@ public class ClientScanner extends AbstractClientScanner {
     callable.setCaching(this.caching);
     // This flag is set when we want to skip the result returned. We do
     // this when we reset scanner because it split under us.
-    boolean retryAfterOutOfOrderException = true;
+    MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
     for (;;) {
       try {
         // Server returns a null values if scanning is to stop. Else,
         // returns an empty array if scanning is to go on and we've just
         // exhausted current region.
-        values = call(callable, caller, scannerTimeout);
+        // now we will also fetch data when openScanner, so do not make a next call again if values
+        // is already non-null.
+        if (values == null) {
+          values = call(callable, caller, scannerTimeout);
+        }
         // When the replica switch happens, we need to do certain operations again.
         // The callable will openScanner with the right startkey but we need to pick up
         // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
         // of the loop as it happens for the cases where we see exceptions.
-        // Since only openScanner would have happened, values would be null
-        if (values == null && callable.switchedToADifferentReplica()) {
+        if (callable.switchedToADifferentReplica()) {
           // Any accumulated partial results are no longer valid since the callable will
           // openScanner with the correct startkey and we must pick up from there
           clearPartialResults();
           this.currentRegion = callable.getHRegionInfo();
-          continue;
-        }
-        retryAfterOutOfOrderException = true;
-      } catch (DoNotRetryIOException | NeedUnmanagedConnectionException e) {
-        // An exception was thrown which makes any partial results that we were collecting
-        // invalid. The scanner will need to be reset to the beginning of a row.
-        clearPartialResults();
-        // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
-        // to reset the scanner and come back in again.
-
-        // If exception is any but the list below throw it back to the client; else setup
-        // the scanner and retry.
-        Throwable cause = e.getCause();
-        if ((cause != null && cause instanceof NotServingRegionException) ||
-            (cause != null && cause instanceof RegionServerStoppedException) ||
-            e instanceof OutOfOrderScannerNextException ||
-            e instanceof UnknownScannerException ||
-            e instanceof ScannerResetException) {
-          // Pass. It is easier writing the if loop test as list of what is allowed rather than
-          // as a list of what is not allowed... so if in here, it means we do not throw.
-        } else {
-          throw e;
-        }
-
-        // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
-        if (this.lastResult != null) {
-          // The region has moved. We need to open a brand new scanner at the new location.
-          // Reset the startRow to the row we've seen last so that the new scanner starts at
-          // the correct row. Otherwise we may see previously returned rows again.
-          // (ScannerCallable by now has "relocated" the correct region)
-          if (!this.lastResult.isPartial() && scan.getBatch() < 0) {
-            if (scan.isReversed()) {
-              scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
-            } else {
-              scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
-            }
-          } else {
-            // we need rescan this row because we only loaded partial row before
-            scan.setStartRow(lastResult.getRow());
-          }
-        }
-        if (e instanceof OutOfOrderScannerNextException) {
-          if (retryAfterOutOfOrderException) {
-            retryAfterOutOfOrderException = false;
-          } else {
-            // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
-            throw new DoNotRetryIOException("Failed after retry of " +
-                "OutOfOrderScannerNextException: was there a rpc timeout?", e);
+          // Now we will also fetch data when openScanner so usually we should not get a null
+          // result, but at some places we still use null to indicate the scan is terminated, so add
+          // a sanity check here. Should be removed later.
+          if (values == null) {
+            continue;
           }
         }
-        // Clear region.
-        this.currentRegion = null;
-        // Set this to zero so we don't try and do an rpc and close on remote server when
-        // the exception we got was UnknownScanner or the Server is going down.
-        callable = null;
+        retryAfterOutOfOrderException.setValue(true);
+      } catch (DoNotRetryIOException e) {
+        handleScanError(e, retryAfterOutOfOrderException);
         // reopen the scanner
-        if (!nextScanner(countdown, false)) {
+        values = nextScannerWithRetries(countdown);
+        if (values == null) {
           break;
         }
         continue;
@@ -523,8 +550,18 @@ public class ClientScanner extends AbstractClientScanner {
             this.lastCellLoadedToCache = null;
           }
         }
+        if (scan.getLimit() > 0) {
+          int limit = scan.getLimit() - resultsToAddToCache.size();
+          assert limit >= 0;
+          scan.setLimit(limit);
+        }
       }
-      boolean exhausted = regionExhausted(values);
+      if (scanExhausted(values)) {
+        closeScanner();
+        closed = true;
+        break;
+      }
+      boolean regionExhausted = regionExhausted(values);
       if (callable.isHeartbeatMessage()) {
         if (!cache.isEmpty()) {
           // Caller of this method just wants a Result. If we see a heartbeat message, it means
@@ -542,12 +579,12 @@ public class ClientScanner extends AbstractClientScanner {
       }
       if (countdown <= 0) {
         // we have enough result.
-        closeScannerIfExhausted(exhausted);
+        closeScannerIfExhausted(regionExhausted);
         break;
       }
       if (remainingResultSize <= 0) {
         if (!cache.isEmpty()) {
-          closeScannerIfExhausted(exhausted);
+          closeScannerIfExhausted(regionExhausted);
           break;
         } else {
           // we have reached the max result size but we still can not find anything to return to the
@@ -556,17 +593,21 @@ public class ClientScanner extends AbstractClientScanner {
         }
       }
       // we are done with the current region
-      if (exhausted) {
+      if (regionExhausted) {
         if (!partialResults.isEmpty()) {
           // XXX: continue if there are partial results. But in fact server should not set
           // hasMoreResults to false if there are partial results.
           LOG.warn("Server tells us there is no more results for this region but we still have"
               + " partialResults, this should not happen, retry on the current scanner anyway");
+          values = null; // reset values for the next call
           continue;
         }
-        if (!nextScanner(countdown, values == null)) {
+        values = nextScannerWithRetries(countdown);
+        if (values == null) {
           break;
         }
+      } else {
+        values = null; // reset values for the next call
       }
     }
   }
@@ -738,46 +779,24 @@ public class ClientScanner extends AbstractClientScanner {
     }
   }
 
-    @Override
-    public void close() {
-      if (!scanMetricsPublished) writeScanMetrics();
-      if (callable != null) {
-        callable.setClose();
-        try {
-          call(callable, caller, scannerTimeout);
-        } catch (UnknownScannerException e) {
-           // We used to catch this error, interpret, and rethrow. However, we
-           // have since decided that it's not nice for a scanner's close to
-           // throw exceptions. Chances are it was just due to lease time out.
-        } catch (IOException e) {
-           /* An exception other than UnknownScanner is unexpected. */
-           LOG.warn("scanner failed to close. Exception follows: " + e);
-        }
-        callable = null;
+  @Override
+  public void close() {
+    if (!scanMetricsPublished) writeScanMetrics();
+    if (callable != null) {
+      callable.setClose();
+      try {
+        call(callable, caller, scannerTimeout);
+      } catch (UnknownScannerException e) {
+        // We used to catch this error, interpret, and rethrow. However, we
+        // have since decided that it's not nice for a scanner's close to
+        // throw exceptions. Chances are it was just due to lease time out.
+      } catch (IOException e) {
+        /* An exception other than UnknownScanner is unexpected. */
+        LOG.warn("scanner failed to close. Exception follows: " + e);
       }
-      closed = true;
-    }
-
-  /**
-   * Create the closest row before the specified row
-   * @param row
-   * @return a new byte array which is the closest front row of the specified one
-   */
-  protected static byte[] createClosestRowBefore(byte[] row) {
-    if (row == null) {
-      throw new IllegalArgumentException("The passed row is empty");
-    }
-    if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
-      return MAX_BYTE_ARRAY;
-    }
-    if (row[row.length - 1] == 0) {
-      return Arrays.copyOf(row, row.length - 1);
-    } else {
-      byte[] closestFrontRow = Arrays.copyOf(row, row.length);
-      closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
-      closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
-      return closestFrontRow;
+      callable = null;
     }
+    closed = true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
deleted file mode 100644
index bd5575a..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutorService;
-
-/**
- * <p>
- * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
- * scan results, unless the results cross multiple regions or the row count of
- * results exceed the caching.
- * </p>
- * For small scan, it will get better performance than {@link ReversedClientScanner}
- */
-@InterfaceAudience.Private
-public class ClientSmallReversedScanner extends ReversedClientScanner {
-  private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
-  private ScannerCallableWithReplicas smallReversedScanCallable = null;
-  private SmallReversedScannerCallableFactory callableFactory;
-
-  /**
-   * Create a new ReversibleClientScanner for the specified table. Take note that the passed
-   * {@link Scan} 's start row maybe changed changed.
-   *
-   * @param conf
-   *          The {@link Configuration} to use.
-   * @param scan
-   *          {@link Scan} to use in this scanner
-   * @param tableName
-   *          The table that we wish to rangeGet
-   * @param connection
-   *          Connection identifying the cluster
-   * @param rpcFactory
-   *          Factory used to create the {@link RpcRetryingCaller}
-   * @param controllerFactory
-   *          Factory used to access RPC payloads
-   * @param pool
-   *          Threadpool for RPC threads
-   * @param primaryOperationTimeout
-   *          Call timeout
-   * @throws IOException
-   *           If the remote call fails
-   */
-  public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
-      final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
-      throws IOException {
-    this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout, new SmallReversedScannerCallableFactory());
-  }
-
-  /**
-   * Create a new ReversibleClientScanner for the specified table. Take note that the passed
-   * {@link Scan}'s start row may be changed.
-   *
-   * @param conf
-   *          The {@link Configuration} to use.
-   * @param scan
-   *          {@link Scan} to use in this scanner
-   * @param tableName
-   *          The table that we wish to rangeGet
-   * @param connection
-   *          Connection identifying the cluster
-   * @param rpcFactory
-   *          Factory used to create the {@link RpcRetryingCaller}
-   * @param controllerFactory
-   *          Factory used to access RPC payloads
-   * @param pool
-   *          Threadpool for RPC threads
-   * @param primaryOperationTimeout
-   *          Call timeout
-   * @param callableFactory
-   *          Factory used to create the {@link SmallScannerCallable}
-   * @throws IOException
-   *           If the remote call fails
-   */
-  @VisibleForTesting
-  ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-      SmallReversedScannerCallableFactory callableFactory) throws IOException {
-    super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout);
-    this.callableFactory = callableFactory;
-  }
-
-  /**
-   * Gets a scanner for following scan. Move to next region or continue from the last result or
-   * start from the start row.
-   *
-   * @param nbRows
-   * @param done
-   *          true if Server-side says we're done scanning.
-   * @param currentRegionDone
-   *          true if scan is over on current region
-   * @return true if has next scanner
-   * @throws IOException
-   */
-  private boolean nextScanner(int nbRows, final boolean done,
-                              boolean currentRegionDone) throws IOException {
-    // Where to start the next getter
-    byte[] localStartKey;
-    int cacheNum = nbRows;
-    boolean regionChanged = true;
-    boolean isFirstRegionToLocate = false;
-    // if we're at end of table, close and return false to stop iterating
-    if (this.currentRegion != null && currentRegionDone) {
-      byte[] startKey = this.currentRegion.getStartKey();
-      if (startKey == null
-          || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
-          || checkScanStopRow(startKey) || done) {
-        close();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Finished with small scan at " + this.currentRegion);
-        }
-        return false;
-      }
-      // We take the row just under to get to the previous region.
-      localStartKey = createClosestRowBefore(startKey);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Finished with region " + this.currentRegion);
-      }
-    } else if (this.lastResult != null) {
-      regionChanged = false;
-      localStartKey = createClosestRowBefore(lastResult.getRow());
-    } else {
-      localStartKey = this.scan.getStartRow();
-      isFirstRegionToLocate = true;
-    }
-
-    if (!isFirstRegionToLocate
-        && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
-      // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan.
-      // otherwise, maybe infinity results with RowKey=0x00 will return.
-      return false;
-    }
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Advancing internal small scanner to startKey at '"
-          + Bytes.toStringBinary(localStartKey) + "'");
-    }
-
-    smallReversedScanCallable =
-        callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
-          localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
-          getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
-
-    if (this.scanMetrics != null && regionChanged) {
-      this.scanMetrics.countOfRegions.incrementAndGet();
-    }
-    return true;
-  }
-
-  @Override
-  public Result next() throws IOException {
-    // If the scanner is closed and there's nothing left in the cache, next is a
-    // no-op.
-    if (cache.size() == 0 && this.closed) {
-      return null;
-    }
-    if (cache.size() == 0) {
-      loadCache();
-    }
-
-    if (cache.size() > 0) {
-      return cache.poll();
-    }
-    // if we exhausted this scanner before calling close, write out the scan
-    // metrics
-    writeScanMetrics();
-    return null;
-  }
-
-  @Override
-  protected void loadCache() throws IOException {
-    Result[] values = null;
-    long remainingResultSize = maxScannerResultSize;
-    int countdown = this.caching;
-    boolean currentRegionDone = false;
-    // Values == null means server-side filter has determined we must STOP
-    while (remainingResultSize > 0 && countdown > 0
-        && nextScanner(countdown, values == null, currentRegionDone)) {
-      // Server returns a null values if scanning is to stop. Else,
-      // returns an empty array if scanning is to go on and we've just
-      // exhausted current region.
-      // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
-      // we do a callWithRetries
-      values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
-      this.currentRegion = smallReversedScanCallable.getHRegionInfo();
-      long currentTime = System.currentTimeMillis();
-      if (this.scanMetrics != null) {
-        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
-            - lastNext);
-      }
-      lastNext = currentTime;
-      if (values != null && values.length > 0) {
-        for (int i = 0; i < values.length; i++) {
-          Result rs = values[i];
-          cache.add(rs);
-          // We don't make Iterator here
-          for (Cell cell : rs.rawCells()) {
-            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
-          }
-          countdown--;
-          this.lastResult = rs;
-        }
-      }
-      if (smallReversedScanCallable.hasMoreResultsContext()) {
-        currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
-      } else {
-        currentRegionDone = countdown > 0;
-      }
-    }
-  }
-
-  @Override
-  protected void initializeScannerInConstruction() throws IOException {
-    // No need to initialize the scanner when constructing instance, do it when
-    // calling next(). Do nothing here.
-  }
-
-  @Override
-  public void close() {
-    if (!scanMetricsPublished) writeScanMetrics();
-    closed = true;
-  }
-
-  @VisibleForTesting
-  protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
-    this.callableFactory = callableFactory;
-  }
-
-  /**
-   * A reversed ScannerCallable which supports backward small scanning.
-   */
-  static class SmallReversedScannerCallable extends ReversedScannerCallable {
-
-    public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
-        ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
-        int caching, int replicaId) {
-      super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
-      this.setCaching(caching);
-    }
-
-    @Override
-    public Result[] call(int timeout) throws IOException {
-      if (this.closed) return null;
-      if (Thread.interrupted()) {
-        throw new InterruptedIOException();
-      }
-      ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
-      ClientProtos.ScanResponse response = null;
-      controller = controllerFactory.newController();
-      try {
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(timeout);
-        response = getStub().scan(controller, request);
-        Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
-        if (response.hasMoreResultsInRegion()) {
-          setHasMoreResultsContext(true);
-          setServerHasMoreResults(response.getMoreResultsInRegion());
-        } else {
-          setHasMoreResultsContext(false);
-        }
-        // We need to update result metrics since we are overriding call()
-        updateResultsMetrics(results);
-        return results;
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
-    }
-
-    @Override
-    public ScannerCallable getScannerCallableForReplica(int id) {
-      return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
-          scanMetrics, locateStartRow, controllerFactory, getCaching(), id);
-    }
-  }
-
-  protected static class SmallReversedScannerCallableFactory {
-
-    public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
-        Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
-        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-        int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
-        boolean isFirstRegionToLocate) {
-      byte[] locateStartRow = null;
-      if (isFirstRegionToLocate
-          && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
-        // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to
-        // locate a region list, and the last one in region list is the region where our scan start.
-        locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
-      }
-
-      scan.setStartRow(localStartKey);
-      SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
-          scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
-      ScannerCallableWithReplicas scannerCallableWithReplicas =
-          new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
-              retries, scannerTimeout, cacheNum, conf, caller);
-      return scannerCallableWithReplicas;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
deleted file mode 100644
index b1554fd..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
-
-/**
- * Client scanner for small scan. Generally, only one RPC is called to fetch the
- * scan results, unless the results cross multiple regions or the row count of
- * results excess the caching.
- *
- * For small scan, it will get better performance than {@link ClientScanner}
- */
-@InterfaceAudience.Private
-public class ClientSmallScanner extends ClientScanner {
-  private static final Log LOG = LogFactory.getLog(ClientSmallScanner.class);
-  private ScannerCallableWithReplicas smallScanCallable = null;
-  private SmallScannerCallableFactory callableFactory;
-
-  /**
-   * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
-   * 's start row maybe changed changed.
-   *
-   * @param conf
-   *          The {@link Configuration} to use.
-   * @param scan
-   *          {@link Scan} to use in this scanner
-   * @param tableName
-   *          The table that we wish to rangeGet
-   * @param connection
-   *          Connection identifying the cluster
-   * @param rpcFactory
-   *          Factory used to create the {@link RpcRetryingCaller}
-   * @param controllerFactory
-   *          Factory used to access RPC payloads
-   * @param pool
-   *          Threadpool for RPC threads
-   * @param primaryOperationTimeout
-   *          Call timeout
-   * @throws IOException
-   *           If the remote call fails
-   */
-  public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
-      throws IOException {
-    this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout, new SmallScannerCallableFactory());
-  }
-
-  /**
-   * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
-   * 's start row maybe changed changed. Intended for unit tests to provide their own
-   * {@link SmallScannerCallableFactory} implementation/mock.
-   *
-   * @param conf
-   *          The {@link Configuration} to use.
-   * @param scan
-   *          {@link Scan} to use in this scanner
-   * @param tableName
-   *          The table that we wish to rangeGet
-   * @param connection
-   *          Connection identifying the cluster
-   * @param rpcFactory
-   *          Factory used to create the {@link RpcRetryingCaller}
-   * @param controllerFactory
-   *          Factory used to access RPC payloads
-   * @param pool
-   *          Threadpool for RPC threads
-   * @param primaryOperationTimeout
-   *          Call timeout
-   * @param callableFactory
-   *          Factory used to create the {@link SmallScannerCallable}
-   * @throws IOException
-   */
-  @VisibleForTesting
-  ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-      SmallScannerCallableFactory callableFactory) throws IOException {
-    super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout);
-    this.callableFactory = callableFactory;
-  }
-
-  @Override
-  protected void initializeScannerInConstruction() throws IOException {
-    // No need to initialize the scanner when constructing instance, do it when
-    // calling next(). Do nothing here.
-  }
-
-  /**
-   * Gets a scanner for following scan. Move to next region or continue from the
-   * last result or start from the start row.
-   * @param nbRows
-   * @param done true if Server-side says we're done scanning.
-   * @param currentRegionDone true if scan is over on current region
-   * @return true if has next scanner
-   * @throws IOException
-   */
-  private boolean nextScanner(int nbRows, final boolean done,
-      boolean currentRegionDone) throws IOException {
-    // Where to start the next getter
-    byte[] localStartKey;
-    int cacheNum = nbRows;
-    boolean regionChanged = true;
-    // if we're at end of table, close and return false to stop iterating
-    if (this.currentRegion != null && currentRegionDone) {
-      byte[] endKey = this.currentRegion.getEndKey();
-      if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
-          || checkScanStopRow(endKey) || done) {
-        close();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Finished with small scan at " + this.currentRegion);
-        }
-        return false;
-      }
-      localStartKey = endKey;
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Finished with region " + this.currentRegion);
-      }
-    } else if (this.lastResult != null) {
-      regionChanged = false;
-      localStartKey = Bytes.add(lastResult.getRow(), new byte[1]);
-    } else {
-      localStartKey = this.scan.getStartRow();
-    }
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Advancing internal small scanner to startKey at '"
-          + Bytes.toStringBinary(localStartKey) + "'");
-    }
-    smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
-        getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
-        getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
-    if (this.scanMetrics != null && regionChanged) {
-      this.scanMetrics.countOfRegions.incrementAndGet();
-    }
-    return true;
-  }
-
-  static class SmallScannerCallable extends ScannerCallable {
-    public SmallScannerCallable(
-        ClusterConnection connection, TableName table, Scan scan,
-        ScanMetrics scanMetrics, RpcControllerFactory controllerFactory, int caching, int id) {
-      super(connection, table, scan, scanMetrics, controllerFactory, id);
-      this.setCaching(caching);
-    }
-
-    @Override
-    public Result[] call(int timeout) throws IOException {
-      if (this.closed) return null;
-      if (Thread.interrupted()) {
-        throw new InterruptedIOException();
-      }
-      ScanRequest request = RequestConverter.buildScanRequest(getLocation()
-          .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
-      ScanResponse response = null;
-      controller = controllerFactory.newController();
-      try {
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(timeout);
-        response = getStub().scan(controller, request);
-        Result[] results = ResponseConverter.getResults(controller.cellScanner(),
-            response);
-        if (response.hasMoreResultsInRegion()) {
-          setHasMoreResultsContext(true);
-          setServerHasMoreResults(response.getMoreResultsInRegion());
-        } else {
-          setHasMoreResultsContext(false);
-        }
-        // We need to update result metrics since we are overriding call()
-        updateResultsMetrics(results);
-        return results;
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
-    }
-
-    @Override
-    public ScannerCallable getScannerCallableForReplica(int id) {
-      return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
-          scanMetrics, controllerFactory, getCaching(), id);
-    }
-  }
-
-  @Override
-  public Result next() throws IOException {
-    // If the scanner is closed and there's nothing left in the cache, next is a
-    // no-op.
-    if (cache.size() == 0 && this.closed) {
-      return null;
-    }
-    if (cache.size() == 0) {
-      loadCache();
-    }
-
-    if (cache.size() > 0) {
-      return cache.poll();
-    }
-    // if we exhausted this scanner before calling close, write out the scan
-    // metrics
-    writeScanMetrics();
-    return null;
-  }
-
-  @Override
-  protected void loadCache() throws IOException {
-    Result[] values = null;
-    long remainingResultSize = maxScannerResultSize;
-    int countdown = this.caching;
-    boolean currentRegionDone = false;
-    // Values == null means server-side filter has determined we must STOP
-    while (remainingResultSize > 0 && countdown > 0
-        && nextScanner(countdown, values == null, currentRegionDone)) {
-      // Server returns a null values if scanning is to stop. Else,
-      // returns an empty array if scanning is to go on and we've just
-      // exhausted current region.
-      // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
-      // we do a callWithRetries
-      values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
-      this.currentRegion = smallScanCallable.getHRegionInfo();
-      long currentTime = System.currentTimeMillis();
-      if (this.scanMetrics != null) {
-        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
-            - lastNext);
-      }
-      lastNext = currentTime;
-      if (values != null && values.length > 0) {
-        for (int i = 0; i < values.length; i++) {
-          Result rs = values[i];
-          cache.add(rs);
-          // We don't make Iterator here
-          for (Cell cell : rs.rawCells()) {
-            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
-          }
-          countdown--;
-          this.lastResult = rs;
-        }
-      }
-      if (smallScanCallable.hasMoreResultsContext()) {
-        // If the server has more results, the current region is not done
-        currentRegionDone = !smallScanCallable.getServerHasMoreResults();
-      } else {
-        // not guaranteed to get the context in older versions, fall back to checking countdown
-        currentRegionDone = countdown > 0;
-      }
-    }
-  }
-
-  public void close() {
-    if (!scanMetricsPublished) writeScanMetrics();
-    closed = true;
-  }
-
-  @VisibleForTesting
-  protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
-    this.callableFactory = callableFactory;
-  }
-
-  @InterfaceAudience.Private
-  protected static class SmallScannerCallableFactory {
-
-    public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
-        Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
-        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-        int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) {
-      scan.setStartRow(localStartKey);
-      SmallScannerCallable s = new SmallScannerCallable(
-        connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
-      ScannerCallableWithReplicas scannerCallableWithReplicas =
-          new ScannerCallableWithReplicas(table, connection,
-              s, pool, primaryOperationTimeout, scan, retries,
-              scannerTimeout, cacheNum, conf, caller);
-      return scannerCallableWithReplicas;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 5f5badf..ab6cb8d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -1257,8 +1257,7 @@ class ConnectionManager {
       Scan s = new Scan();
       s.setReversed(true);
       s.setStartRow(metaKey);
-      s.setSmall(true);
-      s.setCaching(1);
+      s.setOneRowLimit();
       if (this.useMetaReplicas) {
         s.setConsistency(Consistency.TIMELINE);
       }
@@ -1286,15 +1285,11 @@ class ConnectionManager {
         long pauseBase = this.pause;
         try {
           Result regionInfoRow = null;
-          ReversedClientScanner rcs = null;
-          try {
-            rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
-              rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
+          s.resetMvccReadPoint();
+          try (ReversedClientScanner rcs =
+              new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
+                  rpcControllerFactory, getMetaLookupPool(), 0)) {
             regionInfoRow = rcs.next();
-          } finally {
-            if (rcs != null) {
-              rcs.close();
-            }
           }
 
           if (regionInfoRow == null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index c1e6d23..e7b4114 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -17,14 +17,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -32,8 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
-
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * Utility used by client connections.
@@ -195,4 +196,36 @@ public class ConnectionUtils {
       return false;
     }
   }
+
+  // A byte array in which all elements are the max byte, and it is used to
+  // construct closest front row
+  static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
+
+  /**
+   * Create the closest row after the specified row
+   */
+  static byte[] createClosestRowAfter(byte[] row) {
+    return Arrays.copyOf(row, row.length + 1);
+  }
+
+  /**
+   * Create the closest row before the specified row
+   * @deprecated in fact, we do not know the closest row before the given row, the result is only a
+   *             row very close to the current row. Avoid using this method in the future.
+   */
+  @Deprecated
+  static byte[] createClosestRowBefore(byte[] row) {
+    if (row.length == 0) {
+      return MAX_BYTE_ARRAY;
+    }
+    if (row[row.length - 1] == 0) {
+      return Arrays.copyOf(row, row.length - 1);
+    } else {
+      byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
+      System.arraycopy(row, 0, nextRow, 0, row.length - 1);
+      nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
+      System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length);
+      return nextRow;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 83e7217..1e3a900 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -795,21 +795,9 @@ public class HTable implements HTableInterface, RegionLocator {
     }
 
     if (scan.isReversed()) {
-      if (scan.isSmall()) {
-        return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
-            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
-            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
-      } else {
-        return new ReversedClientScanner(getConfiguration(), scan, getName(),
-            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
-            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
-      }
-    }
-
-    if (scan.isSmall()) {
-      return new ClientSmallScanner(getConfiguration(), scan, getName(),
-          this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
-          pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
+      return new ReversedClientScanner(getConfiguration(), scan, getName(),
+        this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
+        pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
     } else {
       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
           this.rpcCallerFactory, this.rpcControllerFactory,

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index ca998ae..edb66c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -18,15 +18,18 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -58,8 +61,7 @@ public class ReversedClientScanner extends ClientScanner {
   }
 
   @Override
-  protected boolean nextScanner(int nbRows, final boolean done)
-      throws IOException {
+  protected Result[] nextScanner(int nbRows) throws IOException {
     // Close the previous scanner if it's open
     closeScanner();
 
@@ -69,16 +71,17 @@ public class ReversedClientScanner extends ClientScanner {
     // if we're at start of table, close and return false to stop iterating
     if (this.currentRegion != null) {
       byte[] startKey = this.currentRegion.getStartKey();
-      if (startKey == null
-          || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
-          || checkScanStopRow(startKey) || done) {
+      if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
+          || checkScanStopRow(startKey)) {
         close();
         if (LOG.isDebugEnabled()) {
           LOG.debug("Finished " + this.currentRegion);
         }
-        return false;
+        return null;
       }
       localStartKey = startKey;
+      // clear mvcc read point if we are going to switch regions
+      scan.resetMvccReadPoint();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Finished " + this.currentRegion);
       }
@@ -109,17 +112,21 @@ public class ReversedClientScanner extends ClientScanner {
       // beginning of the region
       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
       // we do a callWithRetries
-      this.caller.callWithoutRetries(callable, scannerTimeout);
+      Result[] rrs = this.caller.callWithoutRetries(callable, scannerTimeout);
       this.currentRegion = callable.getHRegionInfo();
       if (this.scanMetrics != null) {
         this.scanMetrics.countOfRegions.incrementAndGet();
       }
+      if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) {
+        // no results for the scan, return null to terminate the scan.
+        return null;
+      }
+      return rrs;
     } catch (IOException e) {
       ExceptionUtil.rethrowIfInterrupt(e);
       close();
       throw e;
     }
-    return true;
   }
 
   protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey,

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index e169f7a..840af97 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -44,7 +44,7 @@ public class ReversedScannerCallable extends ScannerCallable {
   /**
    * The start row for locating regions. In reversed scanner, may locate the
    * regions for a range of keys when doing
-   * {@link ReversedClientScanner#nextScanner(int, boolean)}
+   * {@link ReversedClientScanner#nextScanner(int)}
    */
   protected final byte[] locateStartRow;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 128e7e1..84f1ca9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -175,6 +175,17 @@ public class Scan extends Query {
   private long mvccReadPoint = -1L;
 
   /**
+   * The number of rows we want for this scan. We will terminate the scan if the number of return
+   * rows reaches this value.
+   */
+  private int limit = -1;
+
+  /**
+   * Control whether to use pread at server side.
+   */
+  private ReadType readType = ReadType.DEFAULT;
+
+  /**
    * Create a Scan operation across all rows.
    */
   public Scan() {}
@@ -253,6 +264,7 @@ public class Scan extends Query {
       setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
     }
     this.mvccReadPoint = scan.getMvccReadPoint();
+    this.limit = scan.getLimit();
   }
 
   /**
@@ -1014,6 +1026,62 @@ public class Scan extends Query {
   }
 
   /**
+   * @return the limit of rows for this scan
+   */
+  public int getLimit() {
+    return limit;
+  }
+
+  /**
+   * Set the limit of rows for this scan. We will terminate the scan if the number of returned rows
+   * reaches this value.
+   * <p>
+   * This condition will be tested at last, after all other conditions such as stopRow, filter, etc.
+   * <p>
+   * Can not be used together with batch and allowPartial.
+   * @param limit the limit of rows for this scan
+   * @return this
+   */
+  public Scan setLimit(int limit) {
+    this.limit = limit;
+    return this;
+  }
+
+  /**
+   * Call this when you only want to get one row. It will set {@code limit} to {@code 1}, and also
+   * set {@code readType} to {@link ReadType#PREAD}.
+   * @return this
+   */
+  public Scan setOneRowLimit() {
+    return setLimit(1).setReadType(ReadType.PREAD);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public enum ReadType {
+    DEFAULT, STREAM, PREAD
+  }
+
+  /**
+   * @return the read type for this scan
+   */
+  public ReadType getReadType() {
+    return readType;
+  }
+
+  /**
+   * Set the read type for this scan.
+   * <p>
+   * Notice that we may choose to use pread even if you specific {@link ReadType#STREAM} here. For
+   * example, we will always use pread if this is a get scan.
+   * @return this
+   */
+  public Scan setReadType(ReadType readType) {
+    this.readType = readType;
+    return this;
+  }
+
+  /**
    * Get the mvcc read point used to open a scanner.
    */
   long getMvccReadPoint() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 2dee7ce..55be6da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hbase.client;
 
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.UnknownHostException;
@@ -26,10 +29,8 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -37,10 +38,10 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
@@ -51,12 +52,8 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
-
 /**
  * Scanner operations such as create, next, etc.
  * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
@@ -82,9 +79,15 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   private int logCutOffLatency = 1000;
   private static String myAddress;
   protected final int id;
-  protected boolean serverHasMoreResultsContext;
-  protected boolean serverHasMoreResults;
 
+  enum MoreResults {
+    YES, NO, UNKNOWN
+  }
+
+  private MoreResults moreResultsInRegion;
+  private MoreResults moreResultsForScan;
+
+  private boolean openScanner;
   /**
    * Saves whether or not the most recent response from the server was a heartbeat message.
    * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
@@ -136,6 +139,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
     this.controllerFactory = rpcControllerFactory;
+    this.controller = rpcControllerFactory.newController();
   }
 
   PayloadCarryingRpcController getController() {
@@ -189,135 +193,124 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     }
   }
 
+  private ScanResponse next() throws IOException {
+    // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
+    setHeartbeatMessage(false);
+    incRPCcallsMetrics();
+    ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
+      this.scanMetrics != null, renew, scan.getLimit());
+    try {
+      ScanResponse response = getStub().scan(controller, request);
+      nextCallSeq++;
+      return response;
+    } catch (Exception e) {
+      IOException ioe = ProtobufUtil.handleRemoteException(e);
+      if (logScannerActivity) {
+        LOG.info("Got exception making request " + TextFormat.shortDebugString(request) + " to " +
+            getLocation(),
+          e);
+      }
+      if (logScannerActivity) {
+        if (ioe instanceof UnknownScannerException) {
+          try {
+            HRegionLocation location =
+                getConnection().relocateRegion(getTableName(), scan.getStartRow());
+            LOG.info("Scanner=" + scannerId + " expired, current region location is "
+                + location.toString());
+          } catch (Throwable t) {
+            LOG.info("Failed to relocate region", t);
+          }
+        } else if (ioe instanceof ScannerResetException) {
+          LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
+              + "asked us to reset the scanner state.",
+            ioe);
+        }
+      }
+      // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
+      // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want
+      // ServerCallable#withRetries to just retry when it gets these exceptions. In here in
+      // a scan when doing a next in particular, we want to break out and get the scanner to
+      // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly,
+      // yeah and hard to follow and in need of a refactor).
+      if (ioe instanceof NotServingRegionException) {
+        // Throw a DNRE so that we break out of cycle of calling NSRE
+        // when what we need is to open scanner against new location.
+        // Attach NSRE to signal client that it needs to re-setup scanner.
+        if (this.scanMetrics != null) {
+          this.scanMetrics.countOfNSRE.incrementAndGet();
+        }
+        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
+      } else if (ioe instanceof RegionServerStoppedException) {
+        // Throw a DNRE so that we break out of cycle of the retries and instead go and
+        // open scanner against new location.
+        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
+      } else {
+        // The outer layers will retry
+        throw ioe;
+      }
+    }
+  }
+
+  private void setAlreadyClosed() {
+    this.scannerId = -1L;
+    this.closed = true;
+  }
 
   @Override
-  public Result [] call(int callTimeout) throws IOException {
+  public Result[] call(int callTimeout) throws IOException {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
-    if (controller == null) {
-      controller = controllerFactory.newController();
-      controller.setPriority(getTableName());
-      controller.setCallTimeout(callTimeout);
-    }
-
     if (closed) {
-      if (scannerId != -1) {
-        close();
+      close();
+      return null;
+    }
+    controller.reset();
+    controller.setPriority(getTableName());
+    controller.setCallTimeout(callTimeout);
+    ScanResponse response;
+    if (this.scannerId == -1L) {
+      this.openScanner = true;
+      response = openScanner();
+    } else {
+      this.openScanner = false;
+      response = next();
+    }
+    long timestamp = System.currentTimeMillis();
+    setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
+    Result[] rrs = ResponseConverter.getResults(controller.cellScanner(), response);
+    if (logScannerActivity) {
+      long now = System.currentTimeMillis();
+      if (now - timestamp > logCutOffLatency) {
+        int rows = rrs == null ? 0 : rrs.length;
+        LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner="
+            + scannerId);
+      }
+    }
+    updateServerSideMetrics(response);
+    // moreResults is only used for the case where a filter exhausts all elements
+    if (response.hasMoreResults()) {
+      if (response.getMoreResults()) {
+        setMoreResultsForScan(MoreResults.YES);
+      } else {
+        setMoreResultsForScan(MoreResults.NO);
+        setAlreadyClosed();
       }
     } else {
-      if (scannerId == -1L) {
-        this.scannerId = openScanner();
+      setMoreResultsForScan(MoreResults.UNKNOWN);
+    }
+    if (response.hasMoreResultsInRegion()) {
+      if (response.getMoreResultsInRegion()) {
+        setMoreResultsInRegion(MoreResults.YES);
       } else {
-        Result [] rrs = null;
-        ScanRequest request = null;
-        // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
-        setHeartbeatMessage(false);
-        try {
-          incRPCcallsMetrics();
-          request =
-              RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
-                this.scanMetrics != null, renew);
-          ScanResponse response = null;
-          try {
-            response = getStub().scan(controller, request);
-            // Client and RS maintain a nextCallSeq number during the scan. Every next() call
-            // from client to server will increment this number in both sides. Client passes this
-            // number along with the request and at RS side both the incoming nextCallSeq and its
-            // nextCallSeq will be matched. In case of a timeout this increment at the client side
-            // should not happen. If at the server side fetching of next batch of data was over,
-            // there will be mismatch in the nextCallSeq number. Server will throw
-            // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
-            // as the last successfully retrieved row.
-            // See HBASE-5974
-            nextCallSeq++;
-            long timestamp = System.currentTimeMillis();
-            setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
-            // Results are returned via controller
-            CellScanner cellScanner = controller.cellScanner();
-            rrs = ResponseConverter.getResults(cellScanner, response);
-            if (logScannerActivity) {
-              long now = System.currentTimeMillis();
-              if (now - timestamp > logCutOffLatency) {
-                int rows = rrs == null ? 0 : rrs.length;
-                LOG.info("Took " + (now-timestamp) + "ms to fetch "
-                  + rows + " rows from scanner=" + scannerId);
-              }
-            }
-            updateServerSideMetrics(response);
-            // moreResults is only used for the case where a filter exhausts all elements
-            if (response.hasMoreResults() && !response.getMoreResults()) {
-              scannerId = -1L;
-              closed = true;
-              // Implied that no results were returned back, either.
-              return null;
-            }
-            // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
-            // to size or quantity of results in the response.
-            if (response.hasMoreResultsInRegion()) {
-              // Set what the RS said
-              setHasMoreResultsContext(true);
-              setServerHasMoreResults(response.getMoreResultsInRegion());
-            } else {
-              // Server didn't respond whether it has more results or not.
-              setHasMoreResultsContext(false);
-            }
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-          updateResultsMetrics(rrs);
-        } catch (IOException e) {
-          if (logScannerActivity) {
-            LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
-              + " to " + getLocation(), e);
-          }
-          IOException ioe = e;
-          if (e instanceof RemoteException) {
-            ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
-          }
-          if (logScannerActivity) {
-            if (ioe instanceof UnknownScannerException) {
-              try {
-                HRegionLocation location =
-                    getConnection().relocateRegion(getTableName(), scan.getStartRow());
-                LOG.info("Scanner=" + scannerId
-                  + " expired, current region location is " + location.toString());
-              } catch (Throwable t) {
-                LOG.info("Failed to relocate region", t);
-              }
-            } else if (ioe instanceof ScannerResetException) {
-              LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
-                  + "asked us to reset the scanner state.", ioe);
-            }
-          }
-          // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
-          // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
-          // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
-          // a scan when doing a next in particular, we want to break out and get the scanner to
-          // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
-          // yeah and hard to follow and in need of a refactor).
-          if (ioe instanceof NotServingRegionException) {
-            // Throw a DNRE so that we break out of cycle of calling NSRE
-            // when what we need is to open scanner against new location.
-            // Attach NSRE to signal client that it needs to re-setup scanner.
-            if (this.scanMetrics != null) {
-              this.scanMetrics.countOfNSRE.incrementAndGet();
-            }
-            throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
-          } else if (ioe instanceof RegionServerStoppedException) {
-            // Throw a DNRE so that we break out of cycle of the retries and instead go and
-            // open scanner against new location.
-            throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
-          } else {
-            // The outer layers will retry
-            throw ioe;
-          }
-        }
-        return rrs;
+        setMoreResultsInRegion(MoreResults.NO);
+        setAlreadyClosed();
       }
+    } else {
+      setMoreResultsInRegion(MoreResults.UNKNOWN);
     }
-    return null;
+    updateResultsMetrics(rrs);
+    return rrs;
   }
 
   /**
@@ -326,11 +319,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
    *         timeouts during long running scan operations.
    */
-  protected boolean isHeartbeatMessage() {
+  boolean isHeartbeatMessage() {
     return heartbeatMessage;
   }
 
-  protected void setHeartbeatMessage(boolean heartbeatMessage) {
+  private void setHeartbeatMessage(boolean heartbeatMessage) {
     this.heartbeatMessage = heartbeatMessage;
   }
 
@@ -397,12 +390,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     this.scannerId = -1L;
   }
 
-  protected long openScanner() throws IOException {
+  private ScanResponse openScanner() throws IOException {
     incRPCcallsMetrics();
-    ScanRequest request =
-      RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(),
-        this.scan, 0, false);
+    ScanRequest request = RequestConverter.buildScanRequest(
+      getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
     try {
       ScanResponse response = getStub().scan(controller, request);
       long id = response.getScannerId();
@@ -413,9 +404,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
       if (response.hasMvccReadPoint()) {
         this.scan.setMvccReadPoint(response.getMvccReadPoint());
       }
-      return id;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+      this.scannerId = id;
+      return response;
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
@@ -480,27 +472,31 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
 
   /**
    * Should the client attempt to fetch more results from this region
-   * @return True if the client should attempt to fetch more results, false otherwise.
    */
-  protected boolean getServerHasMoreResults() {
-    assert serverHasMoreResultsContext;
-    return this.serverHasMoreResults;
+  MoreResults moreResultsInRegion() {
+    return moreResultsInRegion;
   }
 
-  protected void setServerHasMoreResults(boolean serverHasMoreResults) {
-    this.serverHasMoreResults = serverHasMoreResults;
+  void setMoreResultsInRegion(MoreResults moreResults) {
+    this.moreResultsInRegion = moreResults;
   }
 
   /**
-   * Did the server respond with information about whether more results might exist.
-   * Not guaranteed to respond with older server versions
-   * @return True if the server responded with information about more results.
+   * Should the client attempt to fetch more results for the whole scan.
    */
-  protected boolean hasMoreResultsContext() {
-    return serverHasMoreResultsContext;
+  MoreResults moreResultsForScan() {
+    return moreResultsForScan;
+  }
+
+  void setMoreResultsForScan(MoreResults moreResults) {
+    this.moreResultsForScan = moreResults;
   }
 
-  protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
-    this.serverHasMoreResultsContext = serverHasMoreResultsContext;
+  /**
+   * Whether the previous call is openScanner. This is used to keep compatible with the old
+   * implementation that we always returns empty result for openScanner.
+   */
+  boolean isOpenScanner() {
+    return openScanner;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index a030e67..46c8f9c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -30,22 +33,18 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
 import org.apache.hadoop.hbase.util.Pair;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class has the logic for handling scanners for regions with and without replicas.
  * 1. A scan is attempted on the default (primary) region
@@ -115,20 +114,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     return currentScannerCallable.getHRegionInfo();
   }
 
-  public boolean getServerHasMoreResults() {
-    return currentScannerCallable.getServerHasMoreResults();
-  }
-
-  public void setServerHasMoreResults(boolean serverHasMoreResults) {
-    currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
+  public MoreResults moreResultsInRegion() {
+    return currentScannerCallable.moreResultsInRegion();
   }
 
-  public boolean hasMoreResultsContext() {
-    return currentScannerCallable.hasMoreResultsContext();
+  public MoreResults moreResultsForScan() {
+    return currentScannerCallable.moreResultsForScan();
   }
 
-  public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
-    currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
+  public boolean isOpenScanner() {
+    return currentScannerCallable.isOpenScanner();
   }
 
   @Override
@@ -342,7 +337,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
       if (callable.getScan().isReversed()) {
         callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
       } else {
-        callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
+        callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow()));
       }
     }
   }


Mime
View raw message