hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-17595 Add partial result support for small/limited scan
Date Thu, 23 Feb 2017 08:07:38 GMT
Repository: hbase
Updated Branches:
  refs/heads/master ff045cab8 -> 8fb44fae3


HBASE-17595 Add partial result support for small/limited scan


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8fb44fae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8fb44fae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8fb44fae

Branch: refs/heads/master
Commit: 8fb44fae35123ab7ddeecfbb80ae5e051c07e111
Parents: ff045ca
Author: zhangduo <zhangduo@apache.org>
Authored: Thu Feb 23 10:12:42 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Thu Feb 23 16:03:42 2017 +0800

----------------------------------------------------------------------
 .../AsyncScanSingleRegionRpcRetryingCaller.java |  36 +--
 .../hadoop/hbase/client/ClientScanner.java      |  25 +-
 .../hbase/client/CompleteScanResultCache.java   |   2 +-
 .../hadoop/hbase/client/ConnectionUtils.java    |  19 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |   4 -
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |  11 +-
 .../org/apache/hadoop/hbase/client/Result.java  |  23 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   2 +-
 .../hbase/shaded/protobuf/ProtobufUtil.java     |   2 +-
 .../hadoop/hbase/regionserver/HRegion.java      |   2 +-
 .../hbase/regionserver/RSRpcServices.java       |  33 ++-
 .../hbase/regionserver/ScannerContext.java      |   6 +-
 .../hbase/TestPartialResultsFromClientSide.java |  30 ---
 .../client/AbstractTestAsyncTableScan.java      | 156 +++++++++---
 .../hadoop/hbase/client/TestAsyncTableScan.java |  32 +--
 .../hbase/client/TestAsyncTableScanAll.java     |  71 +-----
 .../hbase/client/TestAsyncTableScanner.java     |  32 +--
 .../hbase/client/TestRawAsyncTableScan.java     |  15 +-
 .../client/TestScannersFromClientSide.java      |  71 ------
 .../client/TestScannersFromClientSide2.java     | 254 +++++++++++++++++++
 20 files changed, 483 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index c4e8448..abcc26e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
@@ -31,6 +32,7 @@ import io.netty.util.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -202,7 +204,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
     private ScanResponse resp;
 
-    private int numValidResults;
+    private int numberOfIndividualRows;
 
     // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
     // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
@@ -219,7 +221,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       // resume is called after suspend, then it is also safe to just reference resp and
       // numValidResults after the synchronized block as no one will change it anymore.
       ScanResponse localResp;
-      int localNumValidResults;
+      int localNumberOfIndividualRows;
       synchronized (this) {
         if (state == ScanResumerState.INITIALIZED) {
           // user calls this method before we call prepare, so just set the state to
@@ -236,9 +238,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
           leaseRenewer.cancel();
         }
         localResp = this.resp;
-        localNumValidResults = this.numValidResults;
+        localNumberOfIndividualRows = this.numberOfIndividualRows;
       }
-      completeOrNext(localResp, localNumValidResults);
+      completeOrNext(localResp, localNumberOfIndividualRows);
     }
 
     private void scheduleRenewLeaseTask() {
@@ -258,14 +260,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
     // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
     // for more details.
-    synchronized boolean prepare(ScanResponse resp, int numValidResults) {
+    synchronized boolean prepare(ScanResponse resp, int numberOfIndividualRows) {
       if (state == ScanResumerState.RESUMED) {
         // user calls resume before we actually suspend the scan, just continue;
         return false;
       }
       state = ScanResumerState.SUSPENDED;
       this.resp = resp;
-      this.numValidResults = numValidResults;
+      this.numberOfIndividualRows = numberOfIndividualRows;
       // if there are no more results in region then the scanner at RS side will be closed
       // automatically so we do not need to renew lease.
       if (resp.getMoreResultsInRegion()) {
@@ -402,7 +404,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private void updateNextStartRowWhenError(Result result) {
     nextStartRowWhenError = result.getRow();
-    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
+    includeNextStartRowWhenError = result.hasMoreCellsInRow();
   }
 
   private void completeWhenNoMoreResultsInRegion() {
@@ -421,7 +423,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
   }
 
-  private void completeOrNext(ScanResponse resp, int numValidResults) {
+  private void completeOrNext(ScanResponse resp, int numIndividualRows) {
     if (resp.hasMoreResults() && !resp.getMoreResults()) {
       // RS tells us there is no more data for the whole scan
       completeNoMoreResults();
@@ -429,10 +431,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
     if (scan.getLimit() > 0) {
       // The RS should have set the moreResults field in ScanResponse to false when we have reached
-      // the limit.
-      int limit = scan.getLimit() - numValidResults;
-      assert limit > 0;
-      scan.setLimit(limit);
+      // the limit, so we add an assert here.
+      int newLimit = scan.getLimit() - numIndividualRows;
+      assert newLimit > 0;
+      scan.setLimit(newLimit);
     }
     // as in 2.0 this value will always be set
     if (!resp.getMoreResultsInRegion()) {
@@ -462,10 +464,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeWhenError(true);
       return;
     }
-
+    // calculate this before calling onNext as it is free for user to modify the result array in
+    // onNext.
+    int numberOfIndividualRows = numberOfIndividualRows(Arrays.asList(results));
     ScanControllerImpl scanController = new ScanControllerImpl();
     if (results.length == 0) {
-      // if we have nothing to return then this must be a heartbeat message.
+      // if we have nothing to return then just call onHeartbeat.
       consumer.onHeartbeat(scanController);
     } else {
       updateNextStartRowWhenError(results[results.length - 1]);
@@ -482,11 +486,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       return;
     }
     if (state == ScanControllerState.SUSPENDED) {
-      if (scanController.resumer.prepare(resp, results.length)) {
+      if (scanController.resumer.prepare(resp, numberOfIndividualRows)) {
         return;
       }
     }
-    completeOrNext(resp, results.length);
+    completeOrNext(resp, numberOfIndividualRows);
   }
 
   private void call() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 59c52de..47270a7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -51,8 +54,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Implements the scanner interface for the HBase client. If there are multiple regions in a table,
  * this scanner will iterate through them all.
@@ -405,7 +406,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
       // If the lastRow is not partial, then we should start from the next row. As now we can
       // exclude the start row, the logic here is the same for both normal scan and reversed scan.
       // If lastResult is partial then include it, otherwise exclude it.
-      scan.withStartRow(lastResult.getRow(), lastResult.isPartial() || scan.getBatch() > 0);
+      scan.withStartRow(lastResult.getRow(), lastResult.hasMoreCellsInRow());
     }
     if (e instanceof OutOfOrderScannerNextException) {
       if (retryAfterOutOfOrderException.isTrue()) {
@@ -496,16 +497,16 @@ public abstract class ClientScanner extends AbstractClientScanner {
           remainingResultSize -= estimatedHeapSizeOfResult;
           addEstimatedSize(estimatedHeapSizeOfResult);
           this.lastResult = rs;
-          if (this.lastResult.isPartial() || scan.getBatch() > 0) {
+          if (this.lastResult.hasMoreCellsInRow()) {
             updateLastCellLoadedToCache(this.lastResult);
           } else {
             this.lastCellLoadedToCache = null;
           }
         }
-        if (scan.getLimit() > 0) {
-          int limit = scan.getLimit() - resultsToAddToCache.size();
-          assert limit >= 0;
-          scan.setLimit(limit);
+        if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
+          int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache);
+          assert newLimit >= 0;
+          scan.setLimit(newLimit);
         }
       }
       if (scanExhausted(values)) {
@@ -620,7 +621,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
     // In every RPC response there should be at most a single partial result. Furthermore, if
     // there is a partial result, it is guaranteed to be in the last position of the array.
     Result last = resultsFromServer[resultsFromServer.length - 1];
-    Result partial = last.isPartial() ? last : null;
+    Result partial = last.hasMoreCellsInRow() ? last : null;
 
     if (LOG.isTraceEnabled()) {
       StringBuilder sb = new StringBuilder();
@@ -666,7 +667,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
 
           // If the result is not a partial, it is a signal to us that it is the last Result we
           // need to form the complete Result client-side
-          if (!result.isPartial()) {
+          if (!result.hasMoreCellsInRow()) {
             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
             clearPartialResults();
           }
@@ -682,7 +683,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
           // It's possible that in one response from the server we receive the final partial for
           // one row and receive a partial for a different row. Thus, make sure that all Results
           // are added to the proper list
-          if (result.isPartial()) {
+          if (result.hasMoreCellsInRow()) {
             addToPartialResults(result);
           } else {
             resultsToAddToCache.add(result);
@@ -824,7 +825,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
       index++;
     }
     Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
-    return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
+    return Result.create(list, result.getExists(), result.isStale(), result.hasMoreCellsInRow());
   }
 
   protected void initCache() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
index 9dfb8f7..bc79e04 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
@@ -76,7 +76,7 @@ class CompleteScanResultCache implements ScanResultCache {
     // In every RPC response there should be at most a single partial result. Furthermore, if
     // there is a partial result, it is guaranteed to be in the last position of the array.
     Result last = results[results.length - 1];
-    if (last.isPartial()) {
+    if (last.hasMoreCellsInRow()) {
       if (partialResults.isEmpty()) {
         partialResults.add(last);
         return Arrays.copyOf(results, results.length - 1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 852ffdc..28f0cee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -322,7 +322,7 @@ public final class ConnectionUtils {
       return null;
     }
     return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
-      result.isStale(), result.mayHaveMoreCellsInRow());
+      result.isStale(), result.hasMoreCellsInRow());
   }
 
   // Add a delta to avoid timeout immediately after a retry sleeping.
@@ -381,4 +381,21 @@ public final class ConnectionUtils {
     return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
         .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
   }
+
+  /**
+   * Count the individual rows for the given result list.
+   * <p>
+   * There are two reason why we need to use this method instead of a simple {@code results.length}.
+   * <ol>
+   * <li>Server may return only part of the whole cells of a row for the last result, and if
+   * allowPartial is true, we will return the array to user directly. We should not count the last
+   * result.</li>
+   * <li>If this is a batched scan, a row may be split into several results, but they should be
+   * counted as one row. For example, a row with 15 cells will be split into 3 results with 5 cells
+   * each if {@code scan.getBatch()} is 5.</li>
+   * </ol>
+   */
+  public static int numberOfIndividualRows(List<Result> results) {
+    return (int) results.stream().filter(r -> !r.hasMoreCellsInRow()).count();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 1fa33b0..0c383fc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -330,10 +330,6 @@ public class HTable implements Table {
    */
   @Override
   public ResultScanner getScanner(Scan scan) throws IOException {
-    if (scan.getBatch() > 0 && scan.isSmall()) {
-      throw new IllegalArgumentException("Small scan should not be used with batching");
-    }
-
     if (scan.getCaching() <= 0) {
       scan.setCaching(scannerCaching);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index fa3d792..7948b65 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -376,15 +376,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
   }
 
   public void scan(Scan scan, RawScanResultConsumer consumer) {
-    if (scan.isSmall() || scan.getLimit() > 0) {
-      if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
-        consumer.onError(new IllegalArgumentException(
-            "Batch and allowPartial is not allowed for small scan or limited scan"));
-      }
-    }
-    scan = setDefaultScanConfig(scan);
-    new AsyncClientScanner(scan, consumer, tableName, conn, pauseNs, maxAttempts, scanTimeoutNs,
-        readRpcTimeoutNs, startLogErrorsCnt).start();
+    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
+        maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 6ed8759..232e3d3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -84,10 +84,9 @@ public class Result implements CellScannable, CellScanner {
   private boolean stale = false;
 
   /**
-   * See {@link #mayHaveMoreCellsInRow()}. And please notice that, The client side implementation
-   * should also check for row key change to determine if a Result is the last one for a row.
+   * See {@link #hasMoreCellsInRow()}.
    */
-  private boolean mayHaveMoreCellsInRow = false;
+  private boolean hasMoreCellsInRow = false;
   // We're not using java serialization.  Transient here is just a marker to say
   // that this is where we cache row if we're ever asked for it.
   private transient byte [] row = null;
@@ -178,7 +177,7 @@ public class Result implements CellScannable, CellScanner {
     this.cells = cells;
     this.exists = exists;
     this.stale = stale;
-    this.mayHaveMoreCellsInRow = mayHaveMoreCellsInRow;
+    this.hasMoreCellsInRow = mayHaveMoreCellsInRow;
     this.readonly = false;
   }
 
@@ -823,7 +822,7 @@ public class Result implements CellScannable, CellScanner {
         // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
         // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
         // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
-        if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
+        if (i != (partialResults.size() - 1) && !r.hasMoreCellsInRow()) {
           throw new IOException(
               "Cannot form complete result. Result is missing partial flag. " +
                   "Partial Results: " + partialResults);
@@ -910,28 +909,26 @@ public class Result implements CellScannable, CellScanner {
    * for a row and should be combined with a result representing the remaining cells in that row to
    * form a complete (non-partial) result.
    * @return Whether or not the result is a partial result
-   * @deprecated the word 'partial' ambiguous, use {@link #mayHaveMoreCellsInRow()} instead.
+   * @deprecated the word 'partial' ambiguous, use {@link #hasMoreCellsInRow()} instead.
    *             Deprecated since 1.4.0.
-   * @see #mayHaveMoreCellsInRow()
+   * @see #hasMoreCellsInRow()
    */
   @Deprecated
   public boolean isPartial() {
-    return mayHaveMoreCellsInRow;
+    return hasMoreCellsInRow;
   }
 
   /**
    * For scanning large rows, the RS may choose to return the cells chunk by chunk to prevent OOM.
    * This flag is used to tell you if the current Result is the last one of the current row. False
-   * means this Result is the last one. True means there may still be more cells for the current
-   * row. Notice that, 'may' have, not must have. This is because we may reach the size or time
-   * limit just at the last cell of row at RS, so we do not know if it is the last one.
+   * means this Result is the last one. True means there are be more cells for the current row.
    * <p>
    * The Scan configuration used to control the result size on the server is
    * {@link Scan#setMaxResultSize(long)} and the default value can be seen here:
    * {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE}
    */
-  public boolean mayHaveMoreCellsInRow() {
-    return mayHaveMoreCellsInRow;
+  public boolean hasMoreCellsInRow() {
+    return hasMoreCellsInRow;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index d935a08..52ee8a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -1329,7 +1329,7 @@ public final class ProtobufUtil {
     }
 
     builder.setStale(result.isStale());
-    builder.setPartial(result.mayHaveMoreCellsInRow());
+    builder.setPartial(result.hasMoreCellsInRow());
 
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 4b0ba6e..271a0de 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -1443,7 +1443,7 @@ public final class ProtobufUtil {
     }
 
     builder.setStale(result.isStale());
-    builder.setPartial(result.mayHaveMoreCellsInRow());
+    builder.setPartial(result.hasMoreCellsInRow());
 
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 4f08be9..a4dc974 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5970,7 +5970,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // If the size limit was reached it means a partial Result is being returned. Returning a
       // partial Result means that we should not reset the filters; filters should only be reset in
       // between rows
-      if (!scannerContext.mayHaveMoreCellsInRow()) {
+      if (!scannerContext.hasMoreCellsInRow()) {
         resetFilters();
       }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 984b965..e6c2a49 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -350,16 +350,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     private final String scannerName;
     private final RegionScanner s;
     private final Region r;
-    private final boolean allowPartial;
     private final RpcCallback closeCallBack;
     private final RpcCallback shippedCallback;
 
-    public RegionScannerHolder(String scannerName, RegionScanner s, Region r, boolean allowPartial,
+    public RegionScannerHolder(String scannerName, RegionScanner s, Region r,
         RpcCallback closeCallBack, RpcCallback shippedCallback) {
       this.scannerName = scannerName;
       this.s = s;
       this.r = r;
-      this.allowPartial = allowPartial;
       this.closeCallBack = closeCallBack;
       this.shippedCallback = shippedCallback;
     }
@@ -488,7 +486,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     if (clientCellBlockSupported) {
       for (Result res : results) {
         builder.addCellsPerResult(res.size());
-        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
+        builder.addPartialFlagPerResult(res.hasMoreCellsInRow());
       }
       controller.setCellScanner(CellUtil.createCellScanner(results));
     } else {
@@ -1212,8 +1210,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return lastBlock;
   }
 
-  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r,
-      boolean allowPartial) throws LeaseStillHeldException {
+  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
+      throws LeaseStillHeldException {
     Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
       new ScannerListener(scannerName));
     RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
@@ -1224,7 +1222,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       closeCallback = new RegionScannerCloseCallBack(s);
     }
     RegionScannerHolder rsh =
-        new RegionScannerHolder(scannerName, s, r, allowPartial, closeCallback, shippedCallback);
+        new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback);
     RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
     return rsh;
@@ -2722,8 +2720,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     builder.setMvccReadPoint(scanner.getMvccReadPoint());
     builder.setTtl(scannerLeaseTimeoutPeriod);
     String scannerName = String.valueOf(scannerId);
-    return addScanner(scannerName, scanner, region,
-      !scan.isSmall() && !(request.hasLimitOfRows() && request.getLimitOfRows() > 0));
+    return addScanner(scannerName, scanner, region);
   }
 
   private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
@@ -2779,7 +2776,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
   // return whether we have more results in region.
   private boolean scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
-      long maxQuotaResultSize, int rows, List<Result> results, ScanResponse.Builder builder,
+      long maxQuotaResultSize, int maxResults, List<Result> results, ScanResponse.Builder builder,
       MutableObject lastBlock, RpcCallContext context) throws IOException {
     Region region = rsh.r;
     RegionScanner scanner = rsh.s;
@@ -2810,8 +2807,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         // correct ordering of partial results and so we prevent partial results from being
         // formed.
         boolean serverGuaranteesOrderOfPartials = results.isEmpty();
-        boolean allowPartialResults =
-            clientHandlesPartials && serverGuaranteesOrderOfPartials && rsh.allowPartial;
+        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
         boolean moreRows = false;
 
         // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
@@ -2843,7 +2839,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         contextBuilder.setTrackMetrics(trackMetrics);
         ScannerContext scannerContext = contextBuilder.build();
         boolean limitReached = false;
-        while (i < rows) {
+        while (i < maxResults) {
           // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
           // batch limit is a limit on the number of cells per Result. Thus, if progress is
           // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
@@ -2855,7 +2851,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           moreRows = scanner.nextRaw(values, scannerContext);
 
           if (!values.isEmpty()) {
-            Result r = Result.create(values, null, stale, scannerContext.mayHaveMoreCellsInRow());
+            Result r = Result.create(values, null, stale, scannerContext.hasMoreCellsInRow());
             lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
             results.add(r);
             i++;
@@ -2863,7 +2859,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
           boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
           boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
-          boolean rowLimitReached = i >= rows;
+          boolean rowLimitReached = i >= maxResults;
           limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
 
           if (limitReached || !moreRows) {
@@ -2920,7 +2916,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     // coprocessor postNext hook
     if (region.getCoprocessorHost() != null) {
-      region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
     }
     return builder.getMoreResultsInRegion();
   }
@@ -3073,8 +3069,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         // with the old scan implementation where we just ignore the returned results if moreResults
         // is false. Can remove the isEmpty check after we get rid of the old implementation.
         moreResults = false;
-      } else if (limitOfRows > 0 && results.size() >= limitOfRows
-          && !results.get(results.size() - 1).mayHaveMoreCellsInRow()) {
+      } else if (limitOfRows > 0 && !results.isEmpty() &&
+          !results.get(results.size() - 1).hasMoreCellsInRow() &&
+          ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) {
         // if we have reached the limit of rows
         moreResults = false;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index ad32772..15e2ec0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -225,10 +225,10 @@ public class ScannerContext {
   }
 
   /**
-   * @return true when we may have more cells for the current row. This usually because we have
-   *         reached a limit in the middle of a row
+   * @return true when we have more cells for the current row. This usually because we have reached
+   *         a limit in the middle of a row
    */
-  boolean mayHaveMoreCellsInRow() {
+  boolean hasMoreCellsInRow() {
     return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW ||
         scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW ||
         scannerState == NextState.BATCH_LIMIT_REACHED;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index 2ebfa6a..83f3101 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -595,36 +595,6 @@ public class TestPartialResultsFromClientSide {
   }
 
   /**
-   * Small scans should not return partial results because it would prevent small scans from
-   * retrieving all of the necessary results in a single RPC request which is what makese small
-   * scans useful. Thus, ensure that even when {@link Scan#getAllowPartialResults()} is true, small
-   * scans do not return partial results
-   * @throws Exception
-   */
-  @Test
-  public void testSmallScansDoNotAllowPartials() throws Exception {
-    Scan scan = new Scan();
-    testSmallScansDoNotAllowPartials(scan);
-    scan.setReversed(true);
-    testSmallScansDoNotAllowPartials(scan);
-  }
-
-  public void testSmallScansDoNotAllowPartials(Scan baseScan) throws Exception {
-    Scan scan = new Scan(baseScan);
-    scan.setAllowPartialResults(true);
-    scan.setSmall(true);
-    scan.setMaxResultSize(1);
-    ResultScanner scanner = TABLE.getScanner(scan);
-    Result r = null;
-
-    while ((r = scanner.next()) != null) {
-      assertFalse(r.isPartial());
-    }
-
-    scanner.close();
-  }
-
-  /**
    * Make puts to put the input value into each combination of row, family, and qualifier
    * @param rows
    * @param families

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
index d0c9806..7bab894 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -24,12 +24,14 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -72,20 +74,41 @@ public abstract class AbstractTestAsyncTableScan {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  protected abstract Scan createScan();
+  protected static Scan createNormalScan() {
+    return new Scan();
+  }
 
-  protected abstract List<Result> doScan(Scan scan) throws Exception;
+  protected static Scan createBatchScan() {
+    return new Scan().setBatch(1);
+  }
+
+  // set a small result size for testing flow control
+  protected static Scan createSmallResultSizeScan() {
+    return new Scan().setMaxResultSize(1);
+  }
+
+  protected static Scan createBatchSmallResultSizeScan() {
+    return new Scan().setBatch(1).setMaxResultSize(1);
+  }
 
-  private Result convertToPartial(Result result) {
-    return Result.create(result.rawCells(), result.getExists(), result.isStale(), true);
+  protected static List<Pair<String, Supplier<Scan>>> getScanCreater() {
+    return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan),
+      Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan),
+      Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan),
+      Pair.newPair("batchSmallResultSize",
+        AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
   }
 
+  protected abstract Scan createScan();
+
+  protected abstract List<Result> doScan(Scan scan) throws Exception;
+
   protected final List<Result> convertFromBatchResult(List<Result> results) {
     assertTrue(results.size() % 2 == 0);
     return IntStream.range(0, results.size() / 2).mapToObj(i -> {
       try {
-        return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)),
-          convertToPartial(results.get(2 * i + 1))));
+        return Result
+            .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1)));
       } catch (IOException e) {
         throw new UncheckedIOException(e);
       }
@@ -98,8 +121,8 @@ public abstract class AbstractTestAsyncTableScan {
     // make sure all scanners are closed at RS side
     TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
         .forEach(rs -> assertEquals(
-          "The scanner count of " + rs.getServerName() + " is "
-              + rs.getRSRpcServices().getScannersCount(),
+          "The scanner count of " + rs.getServerName() + " is " +
+              rs.getRSRpcServices().getScannersCount(),
           0, rs.getRSRpcServices().getScannersCount()));
     assertEquals(COUNT, results.size());
     IntStream.range(0, COUNT).forEach(i -> {
@@ -140,61 +163,112 @@ public abstract class AbstractTestAsyncTableScan {
     IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
   }
 
-  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive)
-      throws Exception {
-    List<Result> results = doScan(
-      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
-          .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive));
+  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
+      int limit) throws Exception {
+    Scan scan =
+        createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
+            .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
+    if (limit > 0) {
+      scan.setLimit(limit);
+    }
+    List<Result> results = doScan(scan);
     int actualStart = startInclusive ? start : start + 1;
     int actualStop = stopInclusive ? stop + 1 : stop;
-    assertEquals(actualStop - actualStart, results.size());
-    IntStream.range(0, actualStop - actualStart)
-        .forEach(i -> assertResultEquals(results.get(i), actualStart + i));
+    int count = actualStop - actualStart;
+    if (limit > 0) {
+      count = Math.min(count, limit);
+    }
+    assertEquals(count, results.size());
+    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
   }
 
-  private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive)
-      throws Exception {
-    List<Result> results = doScan(createScan()
+  private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
+      int limit) throws Exception {
+    Scan scan = createScan()
         .withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
-        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true));
+        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
+    if (limit > 0) {
+      scan.setLimit(limit);
+    }
+    List<Result> results = doScan(scan);
     int actualStart = startInclusive ? start : start - 1;
     int actualStop = stopInclusive ? stop - 1 : stop;
-    assertEquals(actualStart - actualStop, results.size());
-    IntStream.range(0, actualStart - actualStop)
-        .forEach(i -> assertResultEquals(results.get(i), actualStart - i));
+    int count = actualStart - actualStop;
+    if (limit > 0) {
+      count = Math.min(count, limit);
+    }
+    assertEquals(count, results.size());
+    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
   }
 
   @Test
   public void testScanWithStartKeyAndStopKey() throws Exception {
-    testScan(1, true, 998, false); // from first region to last region
-    testScan(123, true, 345, true);
-    testScan(234, true, 456, false);
-    testScan(345, false, 567, true);
-    testScan(456, false, 678, false);
+    testScan(1, true, 998, false, -1); // from first region to last region
+    testScan(123, true, 345, true, -1);
+    testScan(234, true, 456, false, -1);
+    testScan(345, false, 567, true, -1);
+    testScan(456, false, 678, false, -1);
   }
 
   @Test
   public void testReversedScanWithStartKeyAndStopKey() throws Exception {
-    testReversedScan(998, true, 1, false); // from last region to first region
-    testReversedScan(543, true, 321, true);
-    testReversedScan(654, true, 432, false);
-    testReversedScan(765, false, 543, true);
-    testReversedScan(876, false, 654, false);
+    testReversedScan(998, true, 1, false, -1); // from last region to first region
+    testReversedScan(543, true, 321, true, -1);
+    testReversedScan(654, true, 432, false, -1);
+    testReversedScan(765, false, 543, true, -1);
+    testReversedScan(876, false, 654, false, -1);
   }
 
   @Test
   public void testScanAtRegionBoundary() throws Exception {
-    testScan(222, true, 333, true);
-    testScan(333, true, 444, false);
-    testScan(444, false, 555, true);
-    testScan(555, false, 666, false);
+    testScan(222, true, 333, true, -1);
+    testScan(333, true, 444, false, -1);
+    testScan(444, false, 555, true, -1);
+    testScan(555, false, 666, false, -1);
   }
 
   @Test
   public void testReversedScanAtRegionBoundary() throws Exception {
-    testReversedScan(333, true, 222, true);
-    testReversedScan(444, true, 333, false);
-    testReversedScan(555, false, 444, true);
-    testReversedScan(666, false, 555, false);
+    testReversedScan(333, true, 222, true, -1);
+    testReversedScan(444, true, 333, false, -1);
+    testReversedScan(555, false, 444, true, -1);
+    testReversedScan(666, false, 555, false, -1);
+  }
+
+  @Test
+  public void testScanWithLimit() throws Exception {
+    testScan(1, true, 998, false, 900); // from first region to last region
+    testScan(123, true, 345, true, 100);
+    testScan(234, true, 456, false, 100);
+    testScan(345, false, 567, true, 100);
+    testScan(456, false, 678, false, 100);
+
+  }
+
+  @Test
+  public void testScanWithLimitGreaterThanActualCount() throws Exception {
+    testScan(1, true, 998, false, 1000); // from first region to last region
+    testScan(123, true, 345, true, 200);
+    testScan(234, true, 456, false, 200);
+    testScan(345, false, 567, true, 200);
+    testScan(456, false, 678, false, 200);
+  }
+
+  @Test
+  public void testReversedScanWithLimit() throws Exception {
+    testReversedScan(998, true, 1, false, 900); // from last region to first region
+    testReversedScan(543, true, 321, true, 100);
+    testReversedScan(654, true, 432, false, 100);
+    testReversedScan(765, false, 543, true, 100);
+    testReversedScan(876, false, 654, false, 100);
+  }
+
+  @Test
+  public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
+    testReversedScan(998, true, 1, false, 1000); // from last region to first region
+    testReversedScan(543, true, 321, true, 200);
+    testReversedScan(654, true, 432, false, 200);
+    testReversedScan(765, false, 543, true, 200);
+    testReversedScan(876, false, 654, false, 200);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
index f151e83..a8aad0b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.client;
 import com.google.common.base.Throwables;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ForkJoinPool;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -76,32 +76,16 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
     }
   }
 
-  @Parameter
+  @Parameter(0)
+  public String scanType;
+
+  @Parameter(1)
   public Supplier<Scan> scanCreater;
 
-  @Parameters
+  @Parameters(name = "{index}: scan={0}")
   public static List<Object[]> params() {
-    return Arrays.asList(new Supplier<?>[] { TestAsyncTableScan::createNormalScan },
-      new Supplier<?>[] { TestAsyncTableScan::createBatchScan },
-      new Supplier<?>[] { TestAsyncTableScan::createSmallResultSizeScan },
-      new Supplier<?>[] { TestAsyncTableScan::createBatchSmallResultSizeScan });
-  }
-
-  private static Scan createNormalScan() {
-    return new Scan();
-  }
-
-  private static Scan createBatchScan() {
-    return new Scan().setBatch(1);
-  }
-
-  // set a small result size for testing flow control
-  private static Scan createSmallResultSizeScan() {
-    return new Scan().setMaxResultSize(1);
-  }
-
-  private static Scan createBatchSmallResultSizeScan() {
-    return new Scan().setBatch(1).setMaxResultSize(1);
+    return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
+        .collect(Collectors.toList());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
index a9a3e43..1b414b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
@@ -17,20 +17,14 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.function.Supplier;
-import java.util.stream.IntStream;
+import java.util.stream.Collectors;
 
-import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -61,63 +55,14 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
     return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
   }
 
-  private static Scan createNormalScan() {
-    return new Scan();
-  }
-
-  // test if we can handle partial result when open scanner.
-  private static Scan createSmallResultSizeScan() {
-    return new Scan().setMaxResultSize(1);
-  }
-
   @Parameters(name = "{index}: table={0}, scan={2}")
   public static List<Object[]> params() {
     Supplier<AsyncTableBase> rawTable = TestAsyncTableScanAll::getRawTable;
     Supplier<AsyncTableBase> normalTable = TestAsyncTableScanAll::getTable;
-    Supplier<Scan> normalScan = TestAsyncTableScanAll::createNormalScan;
-    Supplier<Scan> smallResultSizeScan = TestAsyncTableScanAll::createSmallResultSizeScan;
-    return Arrays.asList(new Object[] { "raw", rawTable, "normal", normalScan },
-      new Object[] { "raw", rawTable, "smallResultSize", smallResultSizeScan },
-      new Object[] { "normal", normalTable, "normal", normalScan },
-      new Object[] { "normal", normalTable, "smallResultSize", smallResultSizeScan });
-  }
-
-  @Test
-  public void testScanWithLimit() throws InterruptedException, ExecutionException {
-    int start = 111;
-    int stop = 888;
-    int limit = 300;
-    List<Result> results = getTable.get()
-        .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start)))
-            .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)
-            .setReadType(ReadType.PREAD))
-        .get();
-    assertEquals(limit, results.size());
-    IntStream.range(0, limit).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = start + i;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
-    });
-  }
-
-  @Test
-  public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
-    int start = 888;
-    int stop = 111;
-    int limit = 300;
-    List<Result> results = getTable.get()
-        .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start)))
-            .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)
-            .setReadType(ReadType.PREAD).setReversed(true))
-        .get();
-    assertEquals(limit, results.size());
-    IntStream.range(0, limit).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = start - i;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
-    });
+    return getScanCreater().stream()
+        .flatMap(p -> Arrays.asList(new Object[] { "raw", rawTable, p.getFirst(), p.getSecond() },
+          new Object[] { "normal", normalTable, p.getFirst(), p.getSecond() }).stream())
+        .collect(Collectors.toList());
   }
 
   @Override
@@ -127,6 +72,10 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
 
   @Override
   protected List<Result> doScan(Scan scan) throws Exception {
-    return getTable.get().scanAll(scan).get();
+    List<Result> results = getTable.get().scanAll(scan).get();
+    if (scan.getBatch() > 0) {
+      results = convertFromBatchResult(results);
+    }
+    return results;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java
index a3cad17..cefc882 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java
@@ -18,10 +18,10 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ForkJoinPool;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -35,32 +35,16 @@ import org.junit.runners.Parameterized.Parameters;
 @Category({ LargeTests.class, ClientTests.class })
 public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
 
-  @Parameter
+  @Parameter(0)
+  public String scanType;
+
+  @Parameter(1)
   public Supplier<Scan> scanCreater;
 
-  @Parameters
+  @Parameters(name = "{index}: scan={0}")
   public static List<Object[]> params() {
-    return Arrays.asList(new Supplier<?>[] { TestAsyncTableScanner::createNormalScan },
-      new Supplier<?>[] { TestAsyncTableScanner::createBatchScan },
-      new Supplier<?>[] { TestAsyncTableScanner::createSmallResultSizeScan },
-      new Supplier<?>[] { TestAsyncTableScanner::createBatchSmallResultSizeScan });
-  }
-
-  private static Scan createNormalScan() {
-    return new Scan();
-  }
-
-  private static Scan createBatchScan() {
-    return new Scan().setBatch(1);
-  }
-
-  // set a small result size for testing flow control
-  private static Scan createSmallResultSizeScan() {
-    return new Scan().setMaxResultSize(1);
-  }
-
-  private static Scan createBatchSmallResultSizeScan() {
-    return new Scan().setBatch(1).setMaxResultSize(1);
+    return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
+        .collect(Collectors.toList());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
index 0be236d..72179c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
@@ -22,10 +22,10 @@ import com.google.common.base.Throwables;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Queue;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -94,17 +94,8 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
 
   @Parameters(name = "{index}: type={0}")
   public static List<Object[]> params() {
-    Supplier<Scan> normal = TestRawAsyncTableScan::createNormalScan;
-    Supplier<Scan> batch = TestRawAsyncTableScan::createBatchScan;
-    return Arrays.asList(new Object[] { "normal", normal }, new Object[] { "batch", batch });
-  }
-
-  private static Scan createNormalScan() {
-    return new Scan();
-  }
-
-  private static Scan createBatchScan() {
-    return new Scan().setBatch(1);
+    return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
+        .collect(Collectors.toList());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 7e93f97..42fecfb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -766,74 +765,4 @@ public class TestScannersFromClientSide {
 
     assertEquals(expKvList.size(), result.size());
   }
-
-  private void assertResultEquals(Result result, int i) {
-    assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
-    assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
-  }
-
-  private void testStartRowStopRowInclusive(Table table, int start, boolean startInclusive,
-      int stop, boolean stopInclusive) throws IOException {
-    int actualStart = startInclusive ? start : start + 1;
-    int actualStop = stopInclusive ? stop + 1 : stop;
-    int expectedCount = actualStop - actualStart;
-    Result[] results;
-    try (ResultScanner scanner = table.getScanner(
-      new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive)
-          .withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive))) {
-      results = scanner.next(expectedCount);
-    }
-    assertEquals(expectedCount, results.length);
-    for (int i = 0; i < expectedCount; i++) {
-      assertResultEquals(results[i], actualStart + i);
-    }
-  }
-
-  private void testReversedStartRowStopRowInclusive(Table table, int start, boolean startInclusive,
-      int stop, boolean stopInclusive) throws IOException {
-    int actualStart = startInclusive ? start : start - 1;
-    int actualStop = stopInclusive ? stop - 1 : stop;
-    int expectedCount = actualStart - actualStop;
-    Result[] results;
-    try (ResultScanner scanner = table.getScanner(
-      new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive)
-          .withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive)
-          .setReversed(true))) {
-      results = scanner.next(expectedCount);
-    }
-    assertEquals(expectedCount, results.length);
-    for (int i = 0; i < expectedCount; i++) {
-      assertResultEquals(results[i], actualStart - i);
-    }
-  }
-
-  @Test
-  public void testStartRowStopRowInclusive() throws IOException, InterruptedException {
-    TableName tableName = TableName.valueOf("testStartRowStopRowInclusive");
-    byte[][] splitKeys = new byte[8][];
-    for (int i = 11; i < 99; i += 11) {
-      splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i));
-    }
-    Table table = TEST_UTIL.createTable(tableName, FAMILY, splitKeys);
-    TEST_UTIL.waitTableAvailable(tableName);
-    try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(tableName)) {
-      for (int i = 0; i < 100; i++) {
-        mutator.mutate(new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, QUALIFIER,
-          Bytes.toBytes(i)));
-      }
-    }
-    // from first region to last region
-    testStartRowStopRowInclusive(table, 1, true, 98, false);
-    testStartRowStopRowInclusive(table, 12, true, 34, true);
-    testStartRowStopRowInclusive(table, 23, true, 45, false);
-    testStartRowStopRowInclusive(table, 34, false, 56, true);
-    testStartRowStopRowInclusive(table, 45, false, 67, false);
-
-    // from last region to first region
-    testReversedStartRowStopRowInclusive(table, 98, true, 1, false);
-    testReversedStartRowStopRowInclusive(table, 54, true, 32, true);
-    testReversedStartRowStopRowInclusive(table, 65, true, 43, false);
-    testReversedStartRowStopRowInclusive(table, 76, false, 54, true);
-    testReversedStartRowStopRowInclusive(table, 87, false, 65, false);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fb44fae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java
new file mode 100644
index 0000000..728a8f9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Testcase for newly added feature in HBASE-17143, such as startRow and stopRow
+ * inclusive/exclusive, limit for rows, etc.
+ */
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestScannersFromClientSide2 {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("scan");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] CQ1 = Bytes.toBytes("cq1");
+
+  private static byte[] CQ2 = Bytes.toBytes("cq2");
+
+  @Parameter(0)
+  public boolean batch;
+
+  @Parameter(1)
+  public boolean smallResultSize;
+
+  @Parameter(2)
+  public boolean allowPartial;
+
+  @Parameters(name = "{index}: batch={0}, smallResultSize={1}, allowPartial={2}")
+  public static List<Object[]> params() {
+    List<Object[]> params = new ArrayList<>();
+    boolean[] values = new boolean[] { false, true };
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        for (int k = 0; k < 2; k++) {
+          params.add(new Object[] { values[i], values[j], values[k] });
+        }
+      }
+    }
+    return params;
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    byte[][] splitKeys = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+    Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
+    List<Put> puts = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      puts.add(new Put(Bytes.toBytes(String.format("%03d", i)))
+          .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)));
+    }
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    table.put(puts);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private Scan createScan() {
+    Scan scan = new Scan();
+    if (batch) {
+      scan.setBatch(1);
+    }
+    if (smallResultSize) {
+      scan.setMaxResultSize(1);
+    }
+    if (allowPartial) {
+      scan.setAllowPartialResults(true);
+    }
+    return scan;
+  }
+
+  private void assertResultEquals(Result result, int i) {
+    assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
+    assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
+    assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
+  }
+
+  private List<Result> doScan(Scan scan) throws IOException {
+    List<Result> results = new ArrayList<>();
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
+        ResultScanner scanner = table.getScanner(scan)) {
+      for (Result r; (r = scanner.next()) != null;) {
+        results.add(r);
+      }
+    }
+    return assertAndCreateCompleteResults(results);
+  }
+
+  private List<Result> assertAndCreateCompleteResults(List<Result> results) throws IOException {
+    if ((!batch && !allowPartial) || (allowPartial && !batch && !smallResultSize)) {
+      for (Result result : results) {
+        assertFalse("Should not have partial result", result.hasMoreCellsInRow());
+      }
+      return results;
+    }
+    List<Result> completeResults = new ArrayList<>();
+    List<Result> partialResults = new ArrayList<>();
+    for (Result result : results) {
+      if (!result.hasMoreCellsInRow()) {
+        assertFalse("Should have partial result", partialResults.isEmpty());
+        partialResults.add(result);
+        completeResults.add(Result.createCompleteResult(partialResults));
+        partialResults.clear();
+      } else {
+        partialResults.add(result);
+      }
+    }
+    assertTrue("Should not have orphan partial result", partialResults.isEmpty());
+    return completeResults;
+  }
+
+  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
+      int limit) throws Exception {
+    Scan scan =
+        createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
+            .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
+    if (limit > 0) {
+      scan.setLimit(limit);
+    }
+    List<Result> results = doScan(scan);
+    int actualStart = startInclusive ? start : start + 1;
+    int actualStop = stopInclusive ? stop + 1 : stop;
+    int count = actualStop - actualStart;
+    if (limit > 0) {
+      count = Math.min(count, limit);
+    }
+    assertEquals(count, results.size());
+    for (int i = 0; i < count; i++) {
+      assertResultEquals(results.get(i), actualStart + i);
+    }
+  }
+
+  private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
+      int limit) throws Exception {
+    Scan scan = createScan()
+        .withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
+        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
+    if (limit > 0) {
+      scan.setLimit(limit);
+    }
+    List<Result> results = doScan(scan);
+    int actualStart = startInclusive ? start : start - 1;
+    int actualStop = stopInclusive ? stop - 1 : stop;
+    int count = actualStart - actualStop;
+    if (limit > 0) {
+      count = Math.min(count, limit);
+    }
+    assertEquals(count, results.size());
+    for (int i = 0; i < count; i++) {
+      assertResultEquals(results.get(i), actualStart - i);
+    }
+  }
+
+  @Test
+  public void testScanWithLimit() throws Exception {
+    testScan(1, true, 998, false, 900); // from first region to last region
+    testScan(123, true, 345, true, 100);
+    testScan(234, true, 456, false, 100);
+    testScan(345, false, 567, true, 100);
+    testScan(456, false, 678, false, 100);
+
+  }
+
+  @Test
+  public void testScanWithLimitGreaterThanActualCount() throws Exception {
+    testScan(1, true, 998, false, 1000); // from first region to last region
+    testScan(123, true, 345, true, 200);
+    testScan(234, true, 456, false, 200);
+    testScan(345, false, 567, true, 200);
+    testScan(456, false, 678, false, 200);
+  }
+
+  public void testReversedScanWithLimit() throws Exception {
+    testReversedScan(998, true, 1, false, 900); // from last region to first region
+    testReversedScan(543, true, 321, true, 100);
+    testReversedScan(654, true, 432, false, 100);
+    testReversedScan(765, false, 543, true, 100);
+    testReversedScan(876, false, 654, false, 100);
+  }
+
+  @Test
+  public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
+    testReversedScan(998, true, 1, false, 1000); // from last region to first region
+    testReversedScan(543, true, 321, true, 200);
+    testReversedScan(654, true, 432, false, 200);
+    testReversedScan(765, false, 543, true, 200);
+    testReversedScan(876, false, 654, false, 200);
+  }
+
+  @Test
+  public void testStartRowStopRowInclusive() throws Exception {
+    testScan(1, true, 998, false, -1); // from first region to last region
+    testScan(123, true, 345, true, -1);
+    testScan(234, true, 456, false, -1);
+    testScan(345, false, 567, true, -1);
+    testScan(456, false, 678, false, -1);
+  }
+
+  @Test
+  public void testReversedStartRowStopRowInclusive() throws Exception {
+    testReversedScan(998, true, 1, false, -1); // from last region to first region
+    testReversedScan(543, true, 321, true, -1);
+    testReversedScan(654, true, 432, false, -1);
+    testReversedScan(765, false, 543, true, -1);
+    testReversedScan(876, false, 654, false, -1);
+  }
+}


Mime
View raw message