hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-13335 Use serverHasMoreResults context in SmallScanner and SmallReversedScanner.
Date Wed, 01 Apr 2015 04:36:51 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 35fdea7d0 -> 79d80b187


HBASE-13335 Use serverHasMoreResults context in SmallScanner and SmallReversedScanner.

Use the context passed back via ScanResponse that a RegionServer
fills in to denote whether or not more results existing in the
current Region. Add a simple factory to remove a static method
used across both SmallScanner and SmallReverseScanner. Add new
unit tests for both scanner classes to test scans with and
without the new context (as a quick backward-compatibility test).


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/79d80b18
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/79d80b18
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/79d80b18

Branch: refs/heads/branch-1
Commit: 79d80b187e2a625e2ddfacacb210373ce3641f23
Parents: 35fdea7
Author: Josh Elser <elserj@apache.org>
Authored: Mon Mar 30 16:55:47 2015 -0400
Committer: Enis Soztutar <enis@apache.org>
Committed: Tue Mar 31 21:35:34 2015 -0700

----------------------------------------------------------------------
 .../client/ClientSmallReversedScanner.java      | 181 +++++++---
 .../hadoop/hbase/client/ClientSmallScanner.java | 205 +++++++----
 .../client/TestClientSmallReversedScanner.java  | 349 +++++++++++++++++++
 .../hbase/client/TestClientSmallScanner.java    | 339 ++++++++++++++++++
 4 files changed, 945 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/79d80b18/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
index 1e94820..18db925 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
@@ -28,9 +28,13 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
+import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 
@@ -46,32 +50,83 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
   private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
   private ScannerCallableWithReplicas smallScanCallable = null;
   private byte[] skipRowOfFirstResult = null;
+  private SmallScannerCallableFactory callableFactory;
 
   /**
-   * Create a new ReversibleClientScanner for the specified table Note that the
-   * passed {@link Scan}'s start row maybe changed changed.
+   * Create a new ReversibleClientScanner for the specified table. Take note that the passed
+   * {@link Scan} 's start row maybe changed changed.
    *
-   * @param conf The {@link Configuration} to use.
-   * @param scan {@link Scan} to use in this scanner
-   * @param tableName The table that we wish to rangeGet
-   * @param connection Connection identifying the cluster
+   * @param conf
+   *          The {@link Configuration} to use.
+   * @param scan
+   *          {@link Scan} to use in this scanner
+   * @param tableName
+   *          The table that we wish to rangeGet
+   * @param connection
+   *          Connection identifying the cluster
    * @param rpcFactory
+   *          Factory used to create the {@link RpcRetryingCaller}
+   * @param controllerFactory
+   *          Factory used to access RPC payloads
+   * @param pool
+   *          Threadpool for RPC threads
+   * @param primaryOperationTimeout
+   *          Call timeout
    * @throws IOException
+   *           If the remote call fails
    */
   public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
-      final TableName tableName, ClusterConnection connection,
-      RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
-      ExecutorService pool, int primaryOperationTimeout) throws IOException {
-    super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout);
+      final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
+      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
+      throws IOException {
+    this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
+        primaryOperationTimeout, new SmallScannerCallableFactory());
+  }
+
+  /**
+   * Create a new ReversibleClientScanner for the specified table. Take note that the passed
+   * {@link Scan}'s start row may be changed.
+   *
+   * @param conf
+   *          The {@link Configuration} to use.
+   * @param scan
+   *          {@link Scan} to use in this scanner
+   * @param tableName
+   *          The table that we wish to rangeGet
+   * @param connection
+   *          Connection identifying the cluster
+   * @param rpcFactory
+   *          Factory used to create the {@link RpcRetryingCaller}
+   * @param controllerFactory
+   *          Factory used to access RPC payloads
+   * @param pool
+   *          Threadpool for RPC threads
+   * @param primaryOperationTimeout
+   *          Call timeout
+   * @param callableFactory
+   *          Factory used to create the {@link SmallScannerCallable}
+   * @throws IOException
+   *           If the remote call fails
+   */
+  @VisibleForTesting
+  ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
+      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
+      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
+      SmallScannerCallableFactory callableFactory) throws IOException {
+    super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
+        primaryOperationTimeout);
+    this.callableFactory = callableFactory;
   }
 
   /**
-   * Gets a scanner for following scan. Move to next region or continue from the
-   * last result or start from the start row.
+   * Gets a scanner for following scan. Move to next region or continue from the last result or
+   * start from the start row.
    *
    * @param nbRows
-   * @param done              true if Server-side says we're done scanning.
-   * @param currentRegionDone true if scan is over on current region
+   * @param done
+   *          true if Server-side says we're done scanning.
+   * @param currentRegionDone
+   *          true if scan is over on current region
    * @return true if has next scanner
    * @throws IOException
    */
@@ -111,10 +166,9 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
           + Bytes.toStringBinary(localStartKey) + "'");
     }
 
-    smallScanCallable = ClientSmallScanner.getSmallScanCallable(
-      getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum,
-      rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
-      getRetries(), getScannerTimeout(), getConf(), caller);
+    smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
+        getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
+        getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
 
     if (this.scanMetrics != null && skipRowOfFirstResult == null) {
       this.scanMetrics.countOfRegions.incrementAndGet();
@@ -130,46 +184,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
       return null;
     }
     if (cache.size() == 0) {
-      Result[] values = null;
-      long remainingResultSize = maxScannerResultSize;
-      int countdown = this.caching;
-      boolean currentRegionDone = false;
-      // Values == null means server-side filter has determined we must STOP
-      while (remainingResultSize > 0 && countdown > 0
-          && nextScanner(countdown, values == null, currentRegionDone)) {
-        // Server returns a null values if scanning is to stop. Else,
-        // returns an empty array if scanning is to go on and we've just
-        // exhausted current region.
-        // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
-        // we do a callWithRetries
-        // TODO use context from server
-        values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
-        this.currentRegion = smallScanCallable.getHRegionInfo();
-        long currentTime = System.currentTimeMillis();
-        if (this.scanMetrics != null) {
-          this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
-              - lastNext);
-        }
-        lastNext = currentTime;
-        if (values != null && values.length > 0) {
-          for (int i = 0; i < values.length; i++) {
-            Result rs = values[i];
-            if (i == 0 && this.skipRowOfFirstResult != null
-                && Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
-              // Skip the first result
-              continue;
-            }
-            cache.add(rs);
-            // We don't make Iterator here
-            for (Cell cell : rs.rawCells()) {
-              remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
-            }
-            countdown--;
-            this.lastResult = rs;
-          }
-        }
-        currentRegionDone = countdown > 0;
-      }
+      loadCache();
     }
 
     if (cache.size() > 0) {
@@ -181,6 +196,52 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
     return null;
   }
 
+  @Override
+  protected void loadCache() throws IOException {
+    Result[] values = null;
+    long remainingResultSize = maxScannerResultSize;
+    int countdown = this.caching;
+    boolean currentRegionDone = false;
+    // Values == null means server-side filter has determined we must STOP
+    while (remainingResultSize > 0 && countdown > 0
+        && nextScanner(countdown, values == null, currentRegionDone)) {
+      // Server returns a null values if scanning is to stop. Else,
+      // returns an empty array if scanning is to go on and we've just
+      // exhausted current region.
+      // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
+      // we do a callWithRetries
+      values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
+      this.currentRegion = smallScanCallable.getHRegionInfo();
+      long currentTime = System.currentTimeMillis();
+      if (this.scanMetrics != null) {
+        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
+            - lastNext);
+      }
+      lastNext = currentTime;
+      if (values != null && values.length > 0) {
+        for (int i = 0; i < values.length; i++) {
+          Result rs = values[i];
+          if (i == 0 && this.skipRowOfFirstResult != null
+              && Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
+            // Skip the first result
+            continue;
+          }
+          cache.add(rs);
+          // We don't make Iterator here
+          for (Cell cell : rs.rawCells()) {
+            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
+          }
+          countdown--;
+          this.lastResult = rs;
+        }
+      }
+      if (smallScanCallable.hasMoreResultsContext()) {
+        currentRegionDone = !smallScanCallable.getServerHasMoreResults();
+      } else {
+        currentRegionDone = countdown > 0;
+      }
+    }
+  }
 
   @Override
   protected void initializeScannerInConstruction() throws IOException {
@@ -194,4 +255,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
     closed = true;
   }
 
+  @VisibleForTesting
+  protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
+    this.callableFactory = callableFactory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d80b18/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index 48a8af0..426eeb1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 
 /**
@@ -55,26 +56,72 @@ public class ClientSmallScanner extends ClientScanner {
   // When fetching results from server, skip the first result if it has the same
   // row with this one
   private byte[] skipRowOfFirstResult = null;
+  private SmallScannerCallableFactory callableFactory;
 
   /**
-   * Create a new ShortClientScanner for the specified table Note that the
-   * passed {@link Scan}'s start row maybe changed changed.
+   * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
+   * 's start row maybe changed changed.
    *
-   * @param conf The {@link Configuration} to use.
-   * @param scan {@link Scan} to use in this scanner
-   * @param tableName The table that we wish to rangeGet
-   * @param connection Connection identifying the cluster
+   * @param conf
+   *          The {@link Configuration} to use.
+   * @param scan
+   *          {@link Scan} to use in this scanner
+   * @param tableName
+   *          The table that we wish to rangeGet
+   * @param connection
+   *          Connection identifying the cluster
    * @param rpcFactory
+   *          Factory used to create the {@link RpcRetryingCaller}
+   * @param controllerFactory
+   *          Factory used to access RPC payloads
    * @param pool
+   *          Threadpool for RPC threads
    * @param primaryOperationTimeout
+   *          Call timeout
    * @throws IOException
+   *           If the remote call fails
    */
-  public ClientSmallScanner(final Configuration conf, final Scan scan,
-      final TableName tableName, ClusterConnection connection,
-      RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
-      ExecutorService pool, int primaryOperationTimeout) throws IOException {
+  public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
+      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
+      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
+      throws IOException {
+    this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
+        primaryOperationTimeout, new SmallScannerCallableFactory());
+  }
+
+  /**
+   * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
+   * 's start row maybe changed changed. Intended for unit tests to provide their own
+   * {@link SmallScannerCallableFactory} implementation/mock.
+   *
+   * @param conf
+   *          The {@link Configuration} to use.
+   * @param scan
+   *          {@link Scan} to use in this scanner
+   * @param tableName
+   *          The table that we wish to rangeGet
+   * @param connection
+   *          Connection identifying the cluster
+   * @param rpcFactory
+   *          Factory used to create the {@link RpcRetryingCaller}
+   * @param controllerFactory
+   *          Factory used to access RPC payloads
+   * @param pool
+   *          Threadpool for RPC threads
+   * @param primaryOperationTimeout
+   *          Call timeout
+   * @param callableFactory
+   *          Factory used to create the {@link SmallScannerCallable}
+   * @throws IOException
+   */
+  @VisibleForTesting
+  ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
+      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
+      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
+      SmallScannerCallableFactory callableFactory) throws IOException {
     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-           primaryOperationTimeout);
+        primaryOperationTimeout);
+    this.callableFactory = callableFactory;
   }
 
   @Override
@@ -125,32 +172,15 @@ public class ClientSmallScanner extends ClientScanner {
       LOG.trace("Advancing internal small scanner to startKey at '"
           + Bytes.toStringBinary(localStartKey) + "'");
     }
-    smallScanCallable = getSmallScanCallable(
-        getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum,
-        rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
-        getRetries(), getScannerTimeout(), getConf(), caller);
+    smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
+        getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
+        getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
     if (this.scanMetrics != null && skipRowOfFirstResult == null) {
       this.scanMetrics.countOfRegions.incrementAndGet();
     }
     return true;
   }
 
-
-  static ScannerCallableWithReplicas getSmallScanCallable(ClusterConnection connection,
-      TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey,
-      final int cacheNum, RpcControllerFactory controllerFactory, ExecutorService pool,
-      int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
-      RpcRetryingCaller<Result[]> caller) {
-    scan.setStartRow(localStartKey);
-    SmallScannerCallable s = new SmallScannerCallable(
-      connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
-    ScannerCallableWithReplicas scannerCallableWithReplicas =
-        new ScannerCallableWithReplicas(table, connection,
-            s, pool, primaryOperationTimeout, scan, retries,
-            scannerTimeout, cacheNum, conf, caller);
-    return scannerCallableWithReplicas;
-  }
-
   static class SmallScannerCallable extends ScannerCallable {
     public SmallScannerCallable(
         ClusterConnection connection, TableName table, Scan scan,
@@ -202,46 +232,7 @@ public class ClientSmallScanner extends ClientScanner {
       return null;
     }
     if (cache.size() == 0) {
-      Result[] values = null;
-      long remainingResultSize = maxScannerResultSize;
-      int countdown = this.caching;
-      boolean currentRegionDone = false;
-      // Values == null means server-side filter has determined we must STOP
-      while (remainingResultSize > 0 && countdown > 0
-          && nextScanner(countdown, values == null, currentRegionDone)) {
-        // Server returns a null values if scanning is to stop. Else,
-        // returns an empty array if scanning is to go on and we've just
-        // exhausted current region.
-        // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
-        // we do a callWithRetries
-        // TODO Use the server's response about more results
-        values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
-        this.currentRegion = smallScanCallable.getHRegionInfo();
-        long currentTime = System.currentTimeMillis();
-        if (this.scanMetrics != null) {
-          this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
-              - lastNext);
-        }
-        lastNext = currentTime;
-        if (values != null && values.length > 0) {
-          for (int i = 0; i < values.length; i++) {
-            Result rs = values[i];
-            if (i == 0 && this.skipRowOfFirstResult != null
-                && Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
-              // Skip the first result
-              continue;
-            }
-            cache.add(rs);
-            // We don't make Iterator here
-            for (Cell cell : rs.rawCells()) {
-              remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
-            }
-            countdown--;
-            this.lastResult = rs;
-          }
-        }
-        currentRegionDone = countdown > 0;
-      }
+      loadCache();
     }
 
     if (cache.size() > 0) {
@@ -254,8 +245,80 @@ public class ClientSmallScanner extends ClientScanner {
   }
 
   @Override
+  protected void loadCache() throws IOException {
+    Result[] values = null;
+    long remainingResultSize = maxScannerResultSize;
+    int countdown = this.caching;
+    boolean currentRegionDone = false;
+    // Values == null means server-side filter has determined we must STOP
+    while (remainingResultSize > 0 && countdown > 0
+        && nextScanner(countdown, values == null, currentRegionDone)) {
+      // Server returns a null values if scanning is to stop. Else,
+      // returns an empty array if scanning is to go on and we've just
+      // exhausted current region.
+      // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
+      // we do a callWithRetries
+      values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
+      this.currentRegion = smallScanCallable.getHRegionInfo();
+      long currentTime = System.currentTimeMillis();
+      if (this.scanMetrics != null) {
+        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
+            - lastNext);
+      }
+      lastNext = currentTime;
+      if (values != null && values.length > 0) {
+        for (int i = 0; i < values.length; i++) {
+          Result rs = values[i];
+          if (i == 0 && this.skipRowOfFirstResult != null
+              && Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
+            // Skip the first result
+            continue;
+          }
+          cache.add(rs);
+          // We don't make Iterator here
+          for (Cell cell : rs.rawCells()) {
+            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
+          }
+          countdown--;
+          this.lastResult = rs;
+        }
+      }
+      if (smallScanCallable.hasMoreResultsContext()) {
+        // If the server has more results, the current region is not done
+        currentRegionDone = !smallScanCallable.getServerHasMoreResults();
+      } else {
+        // not guaranteed to get the context in older versions, fall back to checking countdown
+        currentRegionDone = countdown > 0;
+      }
+    }
+  }
+
   public void close() {
     if (!scanMetricsPublished) writeScanMetrics();
     closed = true;
   }
+
+  @VisibleForTesting
+  protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
+    this.callableFactory = callableFactory;
+  }
+
+  @InterfaceAudience.Private
+  protected static class SmallScannerCallableFactory {
+
+    public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
+        Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
+        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
+        int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) {
+      scan.setStartRow(localStartKey);
+      SmallScannerCallable s = new SmallScannerCallable(
+        connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
+      ScannerCallableWithReplicas scannerCallableWithReplicas =
+          new ScannerCallableWithReplicas(table, connection,
+              s, pool, primaryOperationTimeout, scan, retries,
+              scannerTimeout, cacheNum, conf, caller);
+      return scannerCallableWithReplicas;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d80b18/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
new file mode 100644
index 0000000..4611d08
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the ClientSmallReversedScanner.
+ */
+@Category(SmallTests.class)
+public class TestClientSmallReversedScanner {
+
+  Scan scan;
+  ExecutorService pool;
+  Configuration conf;
+
+  ClusterConnection clusterConn;
+  RpcRetryingCallerFactory rpcFactory;
+  RpcControllerFactory controllerFactory;
+  RpcRetryingCaller<Result[]> caller;
+
+  @Before
+  @SuppressWarnings({"deprecation", "unchecked"})
+  public void setup() throws IOException {
+    clusterConn = Mockito.mock(ClusterConnection.class);
+    rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
+    controllerFactory = Mockito.mock(RpcControllerFactory.class);
+    pool = Executors.newSingleThreadExecutor();
+    scan = new Scan();
+    conf = new Configuration();
+    Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
+    // Mock out the RpcCaller
+    caller = Mockito.mock(RpcRetryingCaller.class);
+    // Return the mock from the factory
+    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
+  }
+
+  @After
+  public void teardown() {
+    if (null != pool) {
+      pool.shutdownNow();
+    }
+  }
+
+  /**
+   * Create a simple Answer which returns true the first time, and false every time after.
+   */
+  private Answer<Boolean> createTrueThenFalseAnswer() {
+    return new Answer<Boolean>() {
+      boolean first = true;
+
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        if (first) {
+          first = false;
+          return true;
+        }
+        return false;
+      }
+    };
+  }
+
+  private SmallScannerCallableFactory getFactory(
+      final ScannerCallableWithReplicas callableWithReplicas) {
+    return new SmallScannerCallableFactory() {
+      @Override
+      public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
+          Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
+          RpcControllerFactory controllerFactory, ExecutorService pool,
+          int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
+          RpcRetryingCaller<Result[]> caller) {
+        return callableWithReplicas;
+      }
+    };
+  }
+
+  @Test
+  public void testContextPresent() throws Exception {
+    final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum);
+
+    ScannerCallableWithReplicas callableWithReplicas = Mockito
+        .mock(ScannerCallableWithReplicas.class);
+
+    // Mock out the RpcCaller
+    @SuppressWarnings("unchecked")
+    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
+    // Return the mock from the factory
+    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
+
+    // Intentionally leave a "default" caching size in the Scan. No matter the value, we
+    // should continue based on the server context
+
+    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+
+    try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
+        TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
+        Integer.MAX_VALUE)) {
+
+      csrs.setScannerCallableFactory(factory);
+
+      // Return some data the first time, less the second, and none after that
+      Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
+          .thenAnswer(new Answer<Result[]>() {
+            int count = 0;
+
+            @Override
+            public Result[] answer(InvocationOnMock invocation) {
+              Result[] results;
+              if (0 == count) {
+                results = new Result[] {Result.create(new Cell[] {kv3}),
+                    Result.create(new Cell[] {kv2})};
+              } else if (1 == count) {
+                results = new Result[] {Result.create(new Cell[] {kv1})};
+              } else {
+                results = new Result[0];
+              }
+              count++;
+              return results;
+            }
+          });
+
+      // Pass back the context always
+      Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
+      // Only have more results the first time
+      Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
+          createTrueThenFalseAnswer());
+
+      // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
+      HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
+      Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
+      // Trigger the "no more data" branch for #nextScanner(...)
+      Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
+
+      csrs.loadCache();
+
+      List<Result> results = csrs.cache;
+      Iterator<Result> iter = results.iterator();
+      assertEquals(3, results.size());
+      for (int i = 3; i >= 1 && iter.hasNext(); i--) {
+        Result result = iter.next();
+        byte[] row = result.getRow();
+        assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
+        assertEquals(1, result.getMap().size());
+      }
+      assertTrue(csrs.closed);
+    }
+  }
+
+  @Test
+  public void testNoContextFewerRecords() throws Exception {
+    final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum);
+
+    ScannerCallableWithReplicas callableWithReplicas = Mockito
+        .mock(ScannerCallableWithReplicas.class);
+
+    // While the server returns 2 records per batch, we expect more records.
+    scan.setCaching(2);
+
+    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+
+    try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
+        TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
+        Integer.MAX_VALUE)) {
+
+      csrs.setScannerCallableFactory(factory);
+
+      // Return some data the first time, less the second, and none after that
+      Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
+          .thenAnswer(new Answer<Result[]>() {
+            int count = 0;
+
+            @Override
+            public Result[] answer(InvocationOnMock invocation) {
+              Result[] results;
+              if (0 == count) {
+                results = new Result[] {Result.create(new Cell[] {kv3}),
+                    Result.create(new Cell[] {kv2})};
+              } else if (1 == count) {
+                // Return fewer records than expected (2)
+                results = new Result[] {Result.create(new Cell[] {kv1})};
+              } else {
+                throw new RuntimeException("Should not fetch a third batch from the server");
+              }
+              count++;
+              return results;
+            }
+          });
+
+      // Server doesn't return the context
+      Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
+      // getServerHasMoreResults shouldn't be called when hasMoreResultsContext returns false
+      Mockito.when(callableWithReplicas.getServerHasMoreResults())
+          .thenThrow(new RuntimeException("Should not be called"));
+
+      // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
+      HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
+      Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
+      // Trigger the "no more data" branch for #nextScanner(...)
+      Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
+
+      csrs.loadCache();
+
+      List<Result> results = csrs.cache;
+      Iterator<Result> iter = results.iterator();
+      assertEquals(2, results.size());
+      for (int i = 3; i >= 2 && iter.hasNext(); i--) {
+        Result result = iter.next();
+        byte[] row = result.getRow();
+        assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
+        assertEquals(1, result.getMap().size());
+      }
+
+      // "consume" the Results
+      results.clear();
+
+      csrs.loadCache();
+
+      assertEquals(1, results.size());
+      Result result = results.get(0);
+      assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8));
+      assertEquals(1, result.getMap().size());
+
+      assertTrue(csrs.closed);
+    }
+  }
+
+  @Test
+  public void testNoContextNoRecords() throws Exception {
+    ScannerCallableWithReplicas callableWithReplicas = Mockito
+        .mock(ScannerCallableWithReplicas.class);
+
+    // While the server return 2 records per RPC, we expect there to be more records.
+    scan.setCaching(2);
+
+    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+
+    try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
+        TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
+        Integer.MAX_VALUE)) {
+
+      csrs.setScannerCallableFactory(factory);
+
+      // Return some data the first time, less the second, and none after that
+      Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
+          .thenReturn(new Result[0]);
+
+      // Server doesn't return the context
+      Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
+      // Only have more results the first time
+      Mockito.when(callableWithReplicas.getServerHasMoreResults())
+          .thenThrow(new RuntimeException("Should not be called"));
+
+      // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
+      HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
+      Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
+      // Trigger the "no more data" branch for #nextScanner(...)
+      Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
+
+      csrs.loadCache();
+
+      assertEquals(0, csrs.cache.size());
+      assertTrue(csrs.closed);
+    }
+  }
+
+  @Test
+  public void testContextNoRecords() throws Exception {
+    ScannerCallableWithReplicas callableWithReplicas = Mockito
+        .mock(ScannerCallableWithReplicas.class);
+
+    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+
+    try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
+        TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
+        Integer.MAX_VALUE)) {
+
+      csrs.setScannerCallableFactory(factory);
+
+      // Return some data the first time, less the second, and none after that
+      Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
+          .thenReturn(new Result[0]);
+
+      // Server doesn't return the context
+      Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
+      // Only have more results the first time
+      Mockito.when(callableWithReplicas.getServerHasMoreResults())
+          .thenReturn(false);
+
+      // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
+      HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
+      Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
+      // Trigger the "no more data" branch for #nextScanner(...)
+      Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
+
+      csrs.loadCache();
+
+      assertEquals(0, csrs.cache.size());
+      assertTrue(csrs.closed);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d80b18/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java
new file mode 100644
index 0000000..90bf4bb
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the ClientSmallScanner.
+ */
+@Category(SmallTests.class)
+public class TestClientSmallScanner {
+
+  Scan scan;
+  ExecutorService pool;
+  Configuration conf;
+
+  ClusterConnection clusterConn;
+  RpcRetryingCallerFactory rpcFactory;
+  RpcControllerFactory controllerFactory;
+  RpcRetryingCaller<Result[]> caller;
+
+  @Before
+  @SuppressWarnings({"deprecation", "unchecked"})
+  public void setup() throws IOException {
+    clusterConn = Mockito.mock(ClusterConnection.class);
+    rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
+    controllerFactory = Mockito.mock(RpcControllerFactory.class);
+    pool = Executors.newSingleThreadExecutor();
+    scan = new Scan();
+    conf = new Configuration();
+    Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
+    // Mock out the RpcCaller
+    caller = Mockito.mock(RpcRetryingCaller.class);
+    // Return the mock from the factory
+    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
+  }
+
+  @After
+  public void teardown() {
+    if (null != pool) {
+      pool.shutdownNow();
+    }
+  }
+
+  /**
+   * Create a simple Answer which returns true the first time, and false every time after.
+   */
+  private Answer<Boolean> createTrueThenFalseAnswer() {
+    return new Answer<Boolean>() {
+      boolean first = true;
+
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        if (first) {
+          first = false;
+          return true;
+        }
+        return false;
+      }
+    };
+  }
+
+  private SmallScannerCallableFactory getFactory(
+      final ScannerCallableWithReplicas callableWithReplicas) {
+    return new SmallScannerCallableFactory() {
+      @Override
+      public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
+          Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
+          RpcControllerFactory controllerFactory, ExecutorService pool,
+          int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
+          RpcRetryingCaller<Result[]> caller) {
+        return callableWithReplicas;
+      }
+    };
+  }
+
+  @Test
+  public void testContextPresent() throws Exception {
+    final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum);
+
+    ScannerCallableWithReplicas callableWithReplicas = Mockito
+        .mock(ScannerCallableWithReplicas.class);
+
+    // Mock out the RpcCaller
+    @SuppressWarnings("unchecked")
+    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
+    // Return the mock from the factory
+    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
+
+    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+
+    // Intentionally leave a "default" caching size in the Scan. No matter the value, we
+    // should continue based on the server context
+
+    try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
+        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+
+      css.setScannerCallableFactory(factory);
+
+      // Return some data the first time, less the second, and none after that
+      Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
+          .thenAnswer(new Answer<Result[]>() {
+            int count = 0;
+
+            @Override
+            public Result[] answer(InvocationOnMock invocation) {
+              Result[] results;
+              if (0 == count) {
+                results = new Result[] {Result.create(new Cell[] {kv1}),
+                    Result.create(new Cell[] {kv2})};
+              } else if (1 == count) {
+                results = new Result[] {Result.create(new Cell[] {kv3})};
+              } else {
+                results = new Result[0];
+              }
+              count++;
+              return results;
+            }
+          });
+
+      // Pass back the context always
+      Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
+      // Only have more results the first time
+      Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
+          createTrueThenFalseAnswer());
+
+      // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
+      HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
+      Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
+      // Trigger the "no more data" branch for #nextScanner(...)
+      Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
+
+      css.loadCache();
+
+      List<Result> results = css.cache;
+      assertEquals(3, results.size());
+      for (int i = 1; i <= 3; i++) {
+        Result result = results.get(i - 1);
+        byte[] row = result.getRow();
+        assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
+        assertEquals(1, result.getMap().size());
+      }
+
+      assertTrue(css.closed);
+    }
+  }
+
+  @Test
+  public void testNoContextFewerRecords() throws Exception {
+    final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
+        Type.Maximum);
+
+    ScannerCallableWithReplicas callableWithReplicas = Mockito
+        .mock(ScannerCallableWithReplicas.class);
+
+    // While the server returns 2 records per batch, we expect more records.
+    scan.setCaching(2);
+    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+
+    try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
+        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+
+      css.setScannerCallableFactory(factory);
+      // Return some data the first time, less the second, and none after that
+      Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
+          .thenAnswer(new Answer<Result[]>() {
+            int count = 0;
+
+            @Override
+            public Result[] answer(InvocationOnMock invocation) {
+              Result[] results;
+              if (0 == count) {
+                results = new Result[] {Result.create(new Cell[] {kv1}),
+                    Result.create(new Cell[] {kv2})};
+              } else if (1 == count) {
+                // Return fewer records than expected (2)
+                results = new Result[] {Result.create(new Cell[] {kv3})};
+              } else {
+                throw new RuntimeException("Should not fetch a third batch from the server");
+              }
+              count++;
+              return results;
+            }
+          });
+
+      // Server doesn't return the context
+      Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
+      // Only have more results the first time
+      Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
+          new RuntimeException("Should not be called"));
+
+      // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
+      HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
+      Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
+      // Trigger the "no more data" branch for #nextScanner(...)
+      Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
+
+      css.loadCache();
+
+      List<Result> results = css.cache;
+      assertEquals(2, results.size());
+      for (int i = 1; i <= 2; i++) {
+        Result result = results.get(i - 1);
+        byte[] row = result.getRow();
+        assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
+        assertEquals(1, result.getMap().size());
+      }
+
+      // "consume" the results we verified
+      results.clear();
+
+      css.loadCache();
+
+      assertEquals(1, results.size());
+      Result result = results.get(0);
+      assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8));
+      assertEquals(1, result.getMap().size());
+      assertTrue(css.closed);
+    }
+  }
+
+  @Test
+  public void testNoContextNoRecords() throws Exception {
+    ScannerCallableWithReplicas callableWithReplicas = Mockito
+        .mock(ScannerCallableWithReplicas.class);
+
+    // While the server return 2 records per RPC, we expect there to be more records.
+    scan.setCaching(2);
+
+    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+
+    try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
+        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+
+      css.setScannerCallableFactory(factory);
+
+      // Return some data the first time, less the second, and none after that
+      Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
+          .thenReturn(new Result[0]);
+
+      // Server doesn't return the context
+      Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
+      // Only have more results the first time
+      Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
+          new RuntimeException("Should not be called"));
+
+      // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
+      HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
+      Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
+      // Trigger the "no more data" branch for #nextScanner(...)
+      Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
+
+      css.loadCache();
+
+      assertEquals(0, css.cache.size());
+      assertTrue(css.closed);
+    }
+  }
+
+  @Test
+  public void testContextNoRecords() throws Exception {
+    ScannerCallableWithReplicas callableWithReplicas = Mockito
+        .mock(ScannerCallableWithReplicas.class);
+
+    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+
+    try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
+        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+
+      css.setScannerCallableFactory(factory);
+
+      // Return some data the first time, less the second, and none after that
+      Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
+          .thenReturn(new Result[0]);
+
+      // Server doesn't return the context
+      Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
+      // Only have more results the first time
+      Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false);
+
+      // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
+      HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
+      Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
+      // Trigger the "no more data" branch for #nextScanner(...)
+      Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
+
+      css.loadCache();
+
+      assertEquals(0, css.cache.size());
+      assertTrue(css.closed);
+    }
+  }
+}


Mime
View raw message