hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [3/3] hbase git commit: HBASE-17045 Unify the implementation of small scan and regular scan
Date Wed, 25 Jan 2017 01:53:50 GMT
HBASE-17045 Unify the implementation of small scan and regular scan


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85d70189
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85d70189
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85d70189

Branch: refs/heads/master
Commit: 85d701892ed969380a8bcca9c9f4e306c74af941
Parents: 616f480
Author: zhangduo <zhangduo@apache.org>
Authored: Tue Jan 24 21:07:25 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Wed Jan 25 09:53:06 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncClientScanner.java |  23 +-
 .../hbase/client/AsyncNonMetaRegionLocator.java |   4 +-
 .../client/AsyncRpcRetryingCallerFactory.java   |  84 +--
 .../AsyncScanSingleRegionRpcRetryingCaller.java |  49 +-
 .../client/AsyncSmallScanRpcRetryingCaller.java | 194 -------
 .../hadoop/hbase/client/AsyncTableBase.java     |  40 +-
 .../hadoop/hbase/client/AsyncTableImpl.java     |   5 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |  56 +-
 .../hbase/client/RawScanResultConsumer.java     |   4 +-
 .../org/apache/hadoop/hbase/client/Scan.java    | 146 +++--
 .../hadoop/hbase/client/ScannerCallable.java    |   2 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  34 ++
 .../hbase/shaded/protobuf/ProtobufUtil.java     |  35 ++
 .../hbase/shaded/protobuf/RequestConverter.java |  24 +-
 .../shaded/protobuf/generated/ClientProtos.java | 553 ++++++++++++++----
 .../src/main/protobuf/Client.proto              |  11 +-
 .../hbase/protobuf/generated/ClientProtos.java  | 556 +++++++++++++++----
 hbase-protocol/src/main/protobuf/Client.proto   |  11 +-
 .../hbase/regionserver/RSRpcServices.java       |  18 +
 .../hadoop/hbase/regionserver/StoreScanner.java |  38 +-
 .../client/AbstractTestAsyncTableScan.java      |   6 +
 .../hbase/client/TestAsyncTableScanAll.java     | 132 +++++
 .../hbase/client/TestAsyncTableSmallScan.java   | 109 ----
 .../hbase/client/TestRawAsyncTableScan.java     |   5 -
 .../hbase/filter/TestMultiRowRangeFilter.java   |   4 +
 25 files changed, 1385 insertions(+), 758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index f656a6c..b9fd34f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 
 /**
  * The asynchronous client scanner implementation.
@@ -95,12 +96,16 @@ class AsyncClientScanner {
 
     public final ClientService.Interface stub;
 
-    public final long scannerId;
+    public final HBaseRpcController controller;
 
-    public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) {
+    public final ScanResponse resp;
+
+    public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller,
+        ScanResponse resp) {
       this.loc = loc;
       this.stub = stub;
-      this.scannerId = scannerId;
+      this.controller = controller;
+      this.resp = resp;
     }
   }
 
@@ -108,14 +113,14 @@ class AsyncClientScanner {
       HRegionLocation loc, ClientService.Interface stub) {
     CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
     try {
-      ScanRequest request =
-          RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, 0, false);
+      ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
+        scan, scan.getCaching(), false);
       stub.scan(controller, request, resp -> {
         if (controller.failed()) {
           future.completeExceptionally(controller.getFailed());
           return;
         }
-        future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId()));
+        future.complete(new OpenScannerResponse(loc, stub, controller, resp));
       });
     } catch (IOException e) {
       future.completeExceptionally(e);
@@ -124,11 +129,11 @@ class AsyncClientScanner {
   }
 
   private void startScan(OpenScannerResponse resp) {
-    conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
-        .setScan(scan).consumer(consumer).resultCache(resultCache)
+    conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
+        .stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache)
         .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
         .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start()
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp)
         .whenComplete((hasMore, error) -> {
           if (error != null) {
             consumer.onError(error);

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index ae79b65..2c8669f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -376,7 +377,8 @@ class AsyncNonMetaRegionLocator {
       metaKey = createRegionName(tableName, req.row, NINES, false);
     }
     conn.getRawTable(META_TABLE_NAME)
-        .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+        .scanAll(new Scan().withStartRow(metaKey).setReversed(true).setReadType(ReadType.PREAD)
+            .addFamily(CATALOG_FAMILY).setLimit(1))
         .whenComplete((results, error) -> onScanComplete(tableName, req, results, error));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 5df66cc..6bc2cc1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -30,7 +30,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 
 /**
  * Factory to create an AsyncRpcRetryCaller.
@@ -138,81 +140,6 @@ class AsyncRpcRetryingCallerFactory {
     return new SingleRequestCallerBuilder<>();
   }
 
-  public class SmallScanCallerBuilder extends BuilderBase {
-
-    private TableName tableName;
-
-    private Scan scan;
-
-    private int limit;
-
-    private long scanTimeoutNs = -1L;
-
-    private long rpcTimeoutNs = -1L;
-
-    public SmallScanCallerBuilder table(TableName tableName) {
-      this.tableName = tableName;
-      return this;
-    }
-
-    public SmallScanCallerBuilder setScan(Scan scan) {
-      this.scan = scan;
-      return this;
-    }
-
-    public SmallScanCallerBuilder limit(int limit) {
-      this.limit = limit;
-      return this;
-    }
-
-    public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
-      this.scanTimeoutNs = unit.toNanos(scanTimeout);
-      return this;
-    }
-
-    public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
-      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
-      return this;
-    }
-
-    public SmallScanCallerBuilder pause(long pause, TimeUnit unit) {
-      this.pauseNs = unit.toNanos(pause);
-      return this;
-    }
-
-    public SmallScanCallerBuilder maxAttempts(int maxAttempts) {
-      this.maxAttempts = maxAttempts;
-      return this;
-    }
-
-    public SmallScanCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
-      this.startLogErrorsCnt = startLogErrorsCnt;
-      return this;
-    }
-
-    public AsyncSmallScanRpcRetryingCaller build() {
-      TableName tableName = checkNotNull(this.tableName, "tableName is null");
-      Scan scan = checkNotNull(this.scan, "scan is null");
-      checkArgument(limit > 0, "invalid limit %d", limit);
-      return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, pauseNs, maxAttempts,
-          scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
-    }
-
-    /**
-     * Shortcut for {@code build().call()}
-     */
-    public CompletableFuture<List<Result>> call() {
-      return build().call();
-    }
-  }
-
-  /**
-   * Create retry caller for small scan.
-   */
-  public SmallScanCallerBuilder smallScan() {
-    return new SmallScanCallerBuilder();
-  }
-
   public class ScanSingleRegionCallerBuilder extends BuilderBase {
 
     private long scannerId = -1L;
@@ -297,10 +224,11 @@ class AsyncRpcRetryingCallerFactory {
     }
 
     /**
-     * Short cut for {@code build().start()}.
+     * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
      */
-    public CompletableFuture<Boolean> start() {
-      return build().start();
+    public CompletableFuture<Boolean> start(HBaseRpcController controller,
+        ScanResponse respWhenOpen) {
+      return build().start(controller, respWhenOpen);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 5d3b736..3ef4a6f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,11 +17,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
 import io.netty.util.HashedWheelTimer;
@@ -135,6 +134,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
   }
 
+  private long remainingTimeNs() {
+    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+  }
+
   private void closeScanner() {
     resetController(controller, rpcTimeoutNs);
     ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
@@ -199,7 +202,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
     long delayNs;
     if (scanTimeoutNs > 0) {
-      long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
       if (maxDelayNs <= 0) {
         completeExceptionally(!scannerClosed);
         return;
@@ -245,7 +248,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
   }
 
-  private void onComplete(ScanResponse resp) {
+  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
     if (controller.failed()) {
       onError(controller.getFailed());
       return;
@@ -288,6 +291,13 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeNoMoreResults();
       return;
     }
+    if (scan.getLimit() > 0) {
+      // The RS should have set the moreResults field in ScanResponse to false when we have reached
+      // the limit.
+      int limit = scan.getLimit() - results.length;
+      assert limit > 0;
+      scan.setLimit(limit);
+    }
     // as in 2.0 this value will always be set
     if (!resp.getMoreResultsInRegion()) {
       completeWhenNoMoreResultsInRegion.run();
@@ -297,10 +307,26 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   }
 
   private void call() {
-    resetController(controller, rpcTimeoutNs);
+    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
+    // less than the scan timeout. If the server does not respond in time(usually this will not
+    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
+    // resending the next request and the only way to fix this is to close the scanner and open a
+    // new one.
+    long callTimeoutNs;
+    if (scanTimeoutNs > 0) {
+      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+      if (remainingNs <= 0) {
+        completeExceptionally(true);
+        return;
+      }
+      callTimeoutNs = remainingNs;
+    } else {
+      callTimeoutNs = 0L;
+    }
+    resetController(controller, callTimeoutNs);
     ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
-      nextCallSeq, false, false);
-    stub.scan(controller, req, this::onComplete);
+      nextCallSeq, false, false, scan.getLimit());
+    stub.scan(controller, req, resp -> onComplete(controller, resp));
   }
 
   private void next() {
@@ -312,10 +338,15 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   }
 
   /**
+   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
+   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
+   * open scanner request is also needed because we may have some data in the CellScanner which is
+   * contained in the controller.
    * @return {@code true} if we should continue, otherwise {@code false}.
    */
-  public CompletableFuture<Boolean> start() {
-    next();
+  public CompletableFuture<Boolean> start(HBaseRpcController controller,
+      ScanResponse respWhenOpen) {
+    onComplete(controller, respWhenOpen);
     return future;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
deleted file mode 100644
index 98a276f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
-
-/**
- * Retry caller for smaller scan.
- */
-@InterfaceAudience.Private
-class AsyncSmallScanRpcRetryingCaller {
-
-  private final AsyncConnectionImpl conn;
-
-  private final TableName tableName;
-
-  private final Scan scan;
-
-  private final int limit;
-
-  private final long scanTimeoutNs;
-
-  private final long rpcTimeoutNs;
-
-  private final long pauseNs;
-
-  private final int maxAttempts;
-
-  private final int startLogErrosCnt;
-
-  private final Function<HRegionInfo, Boolean> nextScan;
-
-  private final List<Result> resultList;
-
-  private final CompletableFuture<List<Result>> future;
-
-  public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
-      int limit, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
-      int startLogErrosCnt) {
-    this.conn = conn;
-    this.tableName = tableName;
-    this.scan = scan;
-    this.limit = limit;
-    this.scanTimeoutNs = scanTimeoutNs;
-    this.rpcTimeoutNs = rpcTimeoutNs;
-    this.pauseNs = pauseNs;
-    this.maxAttempts = maxAttempts;
-    this.startLogErrosCnt = startLogErrosCnt;
-    if (scan.isReversed()) {
-      this.nextScan = this::reversedNextScan;
-    } else {
-      this.nextScan = this::nextScan;
-    }
-    this.resultList = new ArrayList<>();
-    this.future = new CompletableFuture<>();
-  }
-
-  private static final class SmallScanResponse {
-
-    public final Result[] results;
-
-    public final HRegionInfo currentRegion;
-
-    public final boolean hasMoreResultsInRegion;
-
-    public SmallScanResponse(Result[] results, HRegionInfo currentRegion,
-        boolean hasMoreResultsInRegion) {
-      this.results = results;
-      this.currentRegion = currentRegion;
-      this.hasMoreResultsInRegion = hasMoreResultsInRegion;
-    }
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-      justification = "Findbugs seems to be confused by lambda expression.")
-  private CompletableFuture<SmallScanResponse> scan(HBaseRpcController controller,
-      HRegionLocation loc, ClientService.Interface stub) {
-    CompletableFuture<SmallScanResponse> future = new CompletableFuture<>();
-    ScanRequest req;
-    try {
-      req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan,
-        limit - resultList.size(), true);
-    } catch (IOException e) {
-      future.completeExceptionally(e);
-      return future;
-    }
-    stub.scan(controller, req, resp -> {
-      if (controller.failed()) {
-        future.completeExceptionally(controller.getFailed());
-      } else {
-        try {
-          Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp);
-          future.complete(
-            new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion()));
-        } catch (IOException e) {
-          future.completeExceptionally(e);
-        }
-      }
-    });
-    return future;
-  }
-
-  private void onComplete(SmallScanResponse resp) {
-    resultList.addAll(Arrays.asList(resp.results));
-    if (resultList.size() == limit) {
-      future.complete(resultList);
-      return;
-    }
-    if (resp.hasMoreResultsInRegion) {
-      if (resp.results.length > 0) {
-        scan.withStartRow(resp.results[resp.results.length - 1].getRow(), false);
-      }
-      scan();
-      return;
-    }
-    if (!nextScan.apply(resp.currentRegion)) {
-      future.complete(resultList);
-    }
-  }
-
-  private void scan() {
-    conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
-        .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrosCnt).action(this::scan).call()
-        .whenComplete((resp, error) -> {
-          if (error != null) {
-            future.completeExceptionally(error);
-          } else {
-            onComplete(resp);
-          }
-        });
-  }
-
-  public CompletableFuture<List<Result>> call() {
-    scan();
-    return future;
-  }
-
-  private boolean nextScan(HRegionInfo info) {
-    if (noMoreResultsForScan(scan, info)) {
-      return false;
-    } else {
-      scan.withStartRow(info.getEndKey());
-      scan();
-      return true;
-    }
-  }
-
-  private boolean reversedNextScan(HRegionInfo info) {
-    if (noMoreResultsForReverseScan(scan, info)) {
-      return false;
-    } else {
-      scan.withStartRow(info.getStartKey(), false);
-      scan();
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index d82fa22..e201ab2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -300,26 +300,34 @@ public interface AsyncTableBase {
       CompareOp compareOp, byte[] value, RowMutations mutation);
 
   /**
-   * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
-   * @see #smallScan(Scan, int)
-   */
-  default CompletableFuture<List<Result>> smallScan(Scan scan) {
-    return smallScan(scan, Integer.MAX_VALUE);
-  }
-
-  /**
-   * Return all the results that match the given scan object. The number of the returned results
-   * will not be greater than {@code limit}.
+   * Return all the results that match the given scan object.
+   * <p>
+   * Notice that usually you should use this method with a {@link Scan} object that has limit set.
+   * For example, if you want to get the closest row after a given row, you could do this:
+   * <p>
+   *
+   * <pre>
+   * <code>
+   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
+   *   if (results.isEmpty()) {
+   *      System.out.println("No row after " + Bytes.toStringBinary(row));
+   *   } else {
+   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
+   *         + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
+   *   }
+   * });
+   * </code>
+   * </pre>
    * <p>
-   * Notice that the scan must be small, and should not use batch or allowPartialResults. The
-   * {@code caching} property of the scan object is also ignored as we will use {@code limit}
-   * instead.
-   * @param scan A configured {@link Scan} object.
-   * @param limit the limit of results count
+   * If your result set is very large, you should use other scan method to get a scanner or use
+   * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
+   * fetch all the results and store them in a List and then return the list to you.
+   * @param scan A configured {@link Scan} object. SO if you use this method to fetch a really large
+   *          result set, it is likely to cause OOM.
    * @return The results of this small scan operation. The return value will be wrapped by a
    *         {@link CompletableFuture}.
    */
-  CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
+  CompletableFuture<List<Result>> scanAll(Scan scan);
 
   /**
    * Test for the existence of columns in the table, as specified by the Gets.

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 7cd257c..f1625ad 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -144,8 +144,8 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
-  public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
-    return wrap(rawTable.smallScan(scan, limit));
+  public CompletableFuture<List<Result>> scanAll(Scan scan) {
+    return wrap(rawTable.scanAll(scan));
   }
 
   private long resultSize2CacheSize(long maxResultSize) {
@@ -197,4 +197,5 @@ class AsyncTableImpl implements AsyncTable {
   public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
     return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d9d2d35..87323ac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -21,13 +21,13 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 @InterfaceAudience.Private
 class RawAsyncTableImpl implements RawAsyncTable {
 
-  private static final Log LOG = LogFactory.getLog(RawAsyncTableImpl.class);
-
   private final AsyncConnectionImpl conn;
 
   private final TableName tableName;
@@ -332,12 +330,6 @@ class RawAsyncTableImpl implements RawAsyncTable {
         .call();
   }
 
-  private <T> CompletableFuture<T> failedFuture(Throwable error) {
-    CompletableFuture<T> future = new CompletableFuture<>();
-    future.completeExceptionally(error);
-    return future;
-  }
-
   private Scan setDefaultScanConfig(Scan scan) {
     // always create a new scan object as we may reset the start row later.
     Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
@@ -351,27 +343,35 @@ class RawAsyncTableImpl implements RawAsyncTable {
   }
 
   @Override
-  public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
-    if (!scan.isSmall()) {
-      return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
-    }
-    if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
-      return failedFuture(
-        new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
-    }
-    return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
-        .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
-        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+  public CompletableFuture<List<Result>> scanAll(Scan scan) {
+    CompletableFuture<List<Result>> future = new CompletableFuture<>();
+    List<Result> scanResults = new ArrayList<>();
+    scan(scan, new RawScanResultConsumer() {
+
+      @Override
+      public boolean onNext(Result[] results) {
+        scanResults.addAll(Arrays.asList(results));
+        return true;
+      }
+
+      @Override
+      public void onError(Throwable error) {
+        future.completeExceptionally(error);
+      }
+
+      @Override
+      public void onComplete() {
+        future.complete(scanResults);
+      }
+    });
+    return future;
   }
 
   public void scan(Scan scan, RawScanResultConsumer consumer) {
-    if (scan.isSmall()) {
+    if (scan.isSmall() || scan.getLimit() > 0) {
       if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
-        consumer.onError(
-          new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
-      } else {
-        LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
+        consumer.onError(new IllegalArgumentException(
+            "Batch and allowPartial is not allowed for small scan or limited scan"));
       }
     }
     scan = setDefaultScanConfig(scan);
@@ -388,6 +388,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
   public List<CompletableFuture<Void>> put(List<Put> puts) {
     return voidMutate(puts);
   }
+
   @Override
   public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
     return voidMutate(deletes);
@@ -434,4 +435,5 @@ class RawAsyncTableImpl implements RawAsyncTable {
   public long getScanTimeout(TimeUnit unit) {
     return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
index 7f0514c..2e5d422 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
@@ -47,7 +47,9 @@ public interface RawScanResultConsumer {
    * This method give you a chance to terminate a slow scan operation.
    * @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
    */
-  boolean onHeartbeat();
+  default boolean onHeartbeat() {
+    return true;
+  }
 
   /**
    * Indicate that we hit an unrecoverable error and the scan operation is terminated.

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 8d53b9a..31e76da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -46,38 +46,45 @@ import org.apache.hadoop.hbase.util.Bytes;
 /**
  * Used to perform Scan operations.
  * <p>
- * All operations are identical to {@link Get} with the exception of
- * instantiation.  Rather than specifying a single row, an optional startRow
- * and stopRow may be defined.  If rows are not specified, the Scanner will
- * iterate over all rows.
+ * All operations are identical to {@link Get} with the exception of instantiation. Rather than
+ * specifying a single row, an optional startRow and stopRow may be defined. If rows are not
+ * specified, the Scanner will iterate over all rows.
  * <p>
  * To get all columns from all rows of a Table, create an instance with no constraints; use the
- * {@link #Scan()} constructor. To constrain the scan to specific column families,
- * call {@link #addFamily(byte[]) addFamily} for each family to retrieve on your Scan instance.
+ * {@link #Scan()} constructor. To constrain the scan to specific column families, call
+ * {@link #addFamily(byte[]) addFamily} for each family to retrieve on your Scan instance.
  * <p>
- * To get specific columns, call {@link #addColumn(byte[], byte[]) addColumn}
- * for each column to retrieve.
+ * To get specific columns, call {@link #addColumn(byte[], byte[]) addColumn} for each column to
+ * retrieve.
  * <p>
- * To only retrieve columns within a specific range of version timestamps,
- * call {@link #setTimeRange(long, long) setTimeRange}.
+ * To only retrieve columns within a specific range of version timestamps, call
+ * {@link #setTimeRange(long, long) setTimeRange}.
  * <p>
- * To only retrieve columns with a specific timestamp, call
- * {@link #setTimeStamp(long) setTimestamp}.
+ * To only retrieve columns with a specific timestamp, call {@link #setTimeStamp(long) setTimestamp}
+ * .
  * <p>
- * To limit the number of versions of each column to be returned, call
- * {@link #setMaxVersions(int) setMaxVersions}.
+ * To limit the number of versions of each column to be returned, call {@link #setMaxVersions(int)
+ * setMaxVersions}.
  * <p>
- * To limit the maximum number of values returned for each call to next(),
- * call {@link #setBatch(int) setBatch}.
+ * To limit the maximum number of values returned for each call to next(), call
+ * {@link #setBatch(int) setBatch}.
  * <p>
  * To add a filter, call {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
  * <p>
- * Expert: To explicitly disable server-side block caching for this scan,
- * execute {@link #setCacheBlocks(boolean)}.
- * <p><em>Note:</em> Usage alters Scan instances. Internally, attributes are updated as the Scan
- * runs and if enabled, metrics accumulate in the Scan instance. Be aware this is the case when
- * you go to clone a Scan instance or if you go to reuse a created Scan instance; safer is create
- * a Scan instance per usage.
+ * For small scan, it is deprecated in 2.0.0. Now we have a {@link #setLimit(int)} method in Scan
+ * object which is used to tell RS how many rows we want. If the rows return reaches the limit, the
+ * RS will close the RegionScanner automatically. And we will also fetch data when openScanner in
+ * the new implementation, this means we can also finish a scan operation in one rpc call. And we
+ * have also introduced a {@link #setReadType(ReadType)} method. You can use this method to tell RS
+ * to use pread explicitly.
+ * <p>
+ * Expert: To explicitly disable server-side block caching for this scan, execute
+ * {@link #setCacheBlocks(boolean)}.
+ * <p>
+ * <em>Note:</em> Usage alters Scan instances. Internally, attributes are updated as the Scan runs
+ * and if enabled, metrics accumulate in the Scan instance. Be aware this is the case when you go to
+ * clone a Scan instance or if you go to reuse a created Scan instance; safer is create a Scan
+ * instance per usage.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -86,9 +93,9 @@ public class Scan extends Query {
 
   private static final String RAW_ATTR = "_raw_";
 
-  private byte [] startRow = HConstants.EMPTY_START_ROW;
+  private byte[] startRow = HConstants.EMPTY_START_ROW;
   private boolean includeStartRow = true;
-  private byte [] stopRow  = HConstants.EMPTY_END_ROW;
+  private byte[] stopRow  = HConstants.EMPTY_END_ROW;
   private boolean includeStopRow = false;
   private int maxVersions = 1;
   private int batch = -1;
@@ -172,6 +179,16 @@ public class Scan extends Query {
   private long mvccReadPoint = -1L;
 
   /**
+   * The number of rows we want for this scan. We will terminate the scan if the number of return
+   * rows reaches this value.
+   */
+  private int limit = -1;
+
+  /**
+   * Control whether to use pread at server side.
+   */
+  private ReadType readType = ReadType.DEFAULT;
+  /**
    * Create a Scan operation across all rows.
    */
   public Scan() {}
@@ -257,6 +274,7 @@ public class Scan extends Query {
       setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
     }
     this.mvccReadPoint = scan.getMvccReadPoint();
+    this.limit = scan.getLimit();
   }
 
   /**
@@ -969,37 +987,36 @@ public class Scan extends Query {
     return attr == null ? false : Bytes.toBoolean(attr);
   }
 
-
-
   /**
    * Set whether this scan is a small scan
    * <p>
-   * Small scan should use pread and big scan can use seek + read
-   *
-   * seek + read is fast but can cause two problem (1) resource contention (2)
-   * cause too much network io
-   *
-   * [89-fb] Using pread for non-compaction read request
-   * https://issues.apache.org/jira/browse/HBASE-7266
-   *
-   * On the other hand, if setting it true, we would do
-   * openScanner,next,closeScanner in one RPC call. It means the better
-   * performance for small scan. [HBASE-9488].
-   *
-   * Generally, if the scan range is within one data block(64KB), it could be
-   * considered as a small scan.
-   *
+   * Small scan should use pread and big scan can use seek + read seek + read is fast but can cause
+   * two problem (1) resource contention (2) cause too much network io [89-fb] Using pread for
+   * non-compaction read request https://issues.apache.org/jira/browse/HBASE-7266 On the other hand,
+   * if setting it true, we would do openScanner,next,closeScanner in one RPC call. It means the
+   * better performance for small scan. [HBASE-9488]. Generally, if the scan range is within one
+   * data block(64KB), it could be considered as a small scan.
    * @param small
+   * @deprecated since 2.0.0. Use {@link #setLimit(int)} and {@link #setReadType(ReadType)} instead.
+   *             And for the one rpc optimization, now we will also fetch data when openScanner, and
+   *             if the number of rows reaches the limit then we will close the scanner
+   *             automatically which means we will fall back to one rpc.
+   * @see #setLimit(int)
+   * @see #setReadType(ReadType)
    */
+  @Deprecated
   public Scan setSmall(boolean small) {
     this.small = small;
+    this.readType = ReadType.PREAD;
     return this;
   }
 
   /**
    * Get whether this scan is a small scan
    * @return true if small scan
+   * @deprecated since 2.0.0. See the comment of {@link #setSmall(boolean)}
    */
+  @Deprecated
   public boolean isSmall() {
     return small;
   }
@@ -1081,6 +1098,53 @@ public class Scan extends Query {
   }
 
   /**
+   * @return the limit of rows for this scan
+   */
+  public int getLimit() {
+    return limit;
+  }
+
+  /**
+   * Set the limit of rows for this scan. We will terminate the scan if the number of returned rows
+   * reaches this value.
+   * <p>
+   * This condition will be tested at last, after all other conditions such as stopRow, filter, etc.
+   * <p>
+   * Can not be used together with batch and allowPartial.
+   * @param limit the limit of rows for this scan
+   * @return this
+   */
+  public Scan setLimit(int limit) {
+    this.limit = limit;
+    return this;
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public enum ReadType {
+    DEFAULT, STREAM, PREAD
+  }
+
+  /**
+   * @return the read type for this scan
+   */
+  public ReadType getReadType() {
+    return readType;
+  }
+
+  /**
+   * Set the read type for this scan.
+   * <p>
+   * Notice that we may choose to use pread even if you specific {@link ReadType#STREAM} here. For
+   * example, we will always use pread if this is a get scan.
+   * @return this
+   */
+  public Scan setReadType(ReadType readType) {
+    this.readType = readType;
+    return this;
+  }
+
+  /**
    * Get the mvcc read point used to open a scanner.
    */
   long getMvccReadPoint() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 642fae0..f867acb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -194,7 +194,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
         try {
           incRPCcallsMetrics();
           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
-                this.scanMetrics != null, renew);
+            this.scanMetrics != null, renew, -1);
           ScanResponse response = null;
           response = getStub().scan(getRpcController(), request);
           // Client and RS maintain a nextCallSeq number during the scan. Every next() call

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index d3898d4..51a94ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -814,6 +814,32 @@ public final class ProtobufUtil {
     return get;
   }
 
+  public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
+    switch (readType) {
+      case DEFAULT:
+        return ClientProtos.Scan.ReadType.DEFAULT;
+      case STREAM:
+        return ClientProtos.Scan.ReadType.STREAM;
+      case PREAD:
+        return ClientProtos.Scan.ReadType.PREAD;
+      default:
+        throw new IllegalArgumentException("Unknown ReadType: " + readType);
+    }
+  }
+
+  public static Scan.ReadType toReadType(ClientProtos.Scan.ReadType readType) {
+    switch (readType) {
+      case DEFAULT:
+        return Scan.ReadType.DEFAULT;
+      case STREAM:
+        return Scan.ReadType.STREAM;
+      case PREAD:
+        return Scan.ReadType.PREAD;
+      default:
+        throw new IllegalArgumentException("Unknown ReadType: " + readType);
+    }
+  }
+
   /**
    * Convert a client Scan to a protocol buffer Scan
    *
@@ -917,6 +943,9 @@ public final class ProtobufUtil {
     if (scan.includeStopRow()) {
       scanBuilder.setIncludeStopRow(true);
     }
+    if (scan.getReadType() != Scan.ReadType.DEFAULT) {
+      scanBuilder.setReadType(toReadType(scan.getReadType()));
+    }
     return scanBuilder.build();
   }
 
@@ -1015,6 +1044,11 @@ public final class ProtobufUtil {
     if (proto.hasMvccReadPoint()) {
       PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
     }
+    if (scan.isSmall()) {
+      scan.setReadType(Scan.ReadType.PREAD);
+    } else if (proto.hasReadType()) {
+      scan.setReadType(toReadType(proto.getReadType()));
+    }
     return scan;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 7764f65..13ff92e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLoadStats;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.SnapshotType;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@@ -928,6 +929,32 @@ public final class ProtobufUtil {
     return get;
   }
 
+  public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
+    switch (readType) {
+      case DEFAULT:
+        return ClientProtos.Scan.ReadType.DEFAULT;
+      case STREAM:
+        return ClientProtos.Scan.ReadType.STREAM;
+      case PREAD:
+        return ClientProtos.Scan.ReadType.PREAD;
+      default:
+        throw new IllegalArgumentException("Unknown ReadType: " + readType);
+    }
+  }
+
+  public static Scan.ReadType toReadType(ClientProtos.Scan.ReadType readType) {
+    switch (readType) {
+      case DEFAULT:
+        return Scan.ReadType.DEFAULT;
+      case STREAM:
+        return Scan.ReadType.STREAM;
+      case PREAD:
+        return Scan.ReadType.PREAD;
+      default:
+        throw new IllegalArgumentException("Unknown ReadType: " + readType);
+    }
+  }
+
   /**
    * Convert a client Scan to a protocol buffer Scan
    *
@@ -1031,6 +1058,9 @@ public final class ProtobufUtil {
     if (scan.includeStopRow()) {
       scanBuilder.setIncludeStopRow(true);
     }
+    if (scan.getReadType() != Scan.ReadType.DEFAULT) {
+      scanBuilder.setReadType(toReadType(scan.getReadType()));
+    }
     return scanBuilder.build();
   }
 
@@ -1129,6 +1159,11 @@ public final class ProtobufUtil {
     if (proto.hasMvccReadPoint()) {
       PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
     }
+    if (scan.isSmall()) {
+      scan.setReadType(Scan.ReadType.PREAD);
+    } else if (proto.hasReadType()) {
+      scan.setReadType(toReadType(proto.getReadType()));
+    }
     return scan;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index fd08d98..8de9ad8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -462,11 +462,10 @@ public final class RequestConverter {
    * @return a scan request
    * @throws IOException
    */
-  public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan,
-      final int numberOfRows, final boolean closeScanner) throws IOException {
+  public static ScanRequest buildScanRequest(byte[] regionName, Scan scan, int numberOfRows,
+      boolean closeScanner) throws IOException {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(
-      RegionSpecifierType.REGION_NAME, regionName);
+    RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
     builder.setNumberOfRows(numberOfRows);
     builder.setCloseScanner(closeScanner);
     builder.setRegion(region);
@@ -474,19 +473,21 @@ public final class RequestConverter {
     builder.setClientHandlesPartials(true);
     builder.setClientHandlesHeartbeats(true);
     builder.setTrackScanMetrics(scan.isScanMetricsEnabled());
+    if (scan.getLimit() > 0) {
+      builder.setLimitOfRows(scan.getLimit());
+    }
     return builder.build();
   }
 
   /**
    * Create a protocol buffer ScanRequest for a scanner id
-   *
    * @param scannerId
    * @param numberOfRows
    * @param closeScanner
    * @return a scan request
    */
-  public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
-      final boolean closeScanner, final boolean trackMetrics) {
+  public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner,
+      boolean trackMetrics) {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
     builder.setNumberOfRows(numberOfRows);
     builder.setCloseScanner(closeScanner);
@@ -499,16 +500,14 @@ public final class RequestConverter {
 
   /**
    * Create a protocol buffer ScanRequest for a scanner id
-   *
    * @param scannerId
    * @param numberOfRows
    * @param closeScanner
    * @param nextCallSeq
    * @return a scan request
    */
-  public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
-      final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics,
-      final boolean renew) {
+  public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner,
+      long nextCallSeq, boolean trackMetrics, boolean renew, int limitOfRows) {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
     builder.setNumberOfRows(numberOfRows);
     builder.setCloseScanner(closeScanner);
@@ -518,6 +517,9 @@ public final class RequestConverter {
     builder.setClientHandlesHeartbeats(true);
     builder.setTrackScanMetrics(trackMetrics);
     builder.setRenew(renew);
+    if (limitOfRows > 0) {
+      builder.setLimitOfRows(limitOfRows);
+    }
     return builder.build();
   }
 


Mime
View raw message