hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-17608 Add suspend support for RawScanResultConsumer
Date Wed, 22 Feb 2017 12:32:42 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 93e60153b -> f037f230f


HBASE-17608 Add suspend support for RawScanResultConsumer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f037f230
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f037f230
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f037f230

Branch: refs/heads/master
Commit: f037f230fd5a0b6f28e68b02f47efeb4dbc22694
Parents: 93e6015
Author: zhangduo <zhangduo@apache.org>
Authored: Wed Feb 22 15:26:10 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Wed Feb 22 20:32:03 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncClientScanner.java |   7 +-
 .../client/AsyncRpcRetryingCallerFactory.java   |  21 +-
 .../AsyncScanSingleRegionRpcRetryingCaller.java | 264 ++++++++++++++++---
 .../hbase/client/AsyncTableResultScanner.java   | 106 +++-----
 .../hadoop/hbase/client/RawAsyncTable.java      |  14 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |   3 +-
 .../hbase/client/RawScanResultConsumer.java     |  68 ++++-
 .../org/apache/hadoop/hbase/util/Threads.java   |   2 +-
 .../coprocessor/AsyncAggregationClient.java     |   8 +-
 .../client/TestAsyncTableScanRenewLease.java    | 150 +++++++++++
 .../hbase/client/TestAsyncTableScanner.java     |   1 -
 ...stAsyncTableScannerCloseWhileSuspending.java | 105 ++++++++
 .../hbase/client/TestRawAsyncTableScan.java     |   3 +-
 13 files changed, 607 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index b9fd34f..2215d36 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -130,11 +130,12 @@ class AsyncClientScanner {
 
   private void startScan(OpenScannerResponse resp) {
     conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
-        .stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache)
+        .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
+        .setScan(scan).consumer(consumer).resultCache(resultCache)
         .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
         .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp)
-        .whenComplete((hasMore, error) -> {
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
+        .start(resp.controller, resp.resp).whenComplete((hasMore, error) -> {
           if (error != null) {
             consumer.onError(error);
             return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 82c5d63..9bc651d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -156,6 +156,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private HRegionLocation loc;
 
+    private long scannerLeaseTimeoutPeriodNs;
+
     private long scanTimeoutNs;
 
     private long rpcTimeoutNs;
@@ -190,6 +192,12 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod,
+        TimeUnit unit) {
+      this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod);
+      return this;
+    }
+
     public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
       this.scanTimeoutNs = unit.toNanos(scanTimeout);
       return this;
@@ -221,8 +229,8 @@ class AsyncRpcRetryingCallerFactory {
           checkNotNull(scan, "scan is null"), scannerId,
           checkNotNull(resultCache, "resultCache is null"),
           checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
-          checkNotNull(loc, "location is null"), pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs,
-          startLogErrorsCnt);
+          checkNotNull(loc, "location is null"), scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts,
+          scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -307,7 +315,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private long rpcTimeoutNs = -1L;
 
-    public MasterRequestCallerBuilder<T> action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
+    public MasterRequestCallerBuilder<T> action(
+        AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
       this.callable = callable;
       return this;
     }
@@ -338,9 +347,9 @@ class AsyncRpcRetryingCallerFactory {
     }
 
     public AsyncMasterRequestRpcRetryingCaller<T> build() {
-      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, checkNotNull(callable,
-        "action is null"), pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
-          startLogErrorsCnt);
+      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
+          checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
+          rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 3ef4a6f..c4e8448 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
+import com.google.common.base.Preconditions;
+
 import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -39,6 +43,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -76,6 +81,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private final HRegionLocation loc;
 
+  private final long scannerLeaseTimeoutPeriodNs;
+
   private final long pauseNs;
 
   private final int maxAttempts;
@@ -104,10 +111,176 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private long nextCallSeq = -1L;
 
+  private enum ScanControllerState {
+    INITIALIZED, SUSPENDED, TERMINATED, DESTROYED
+  }
+
+  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
+  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
+  // usage. We use two things to prevent invalid usage:
+  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
+  // IllegalStateException if the caller thread is not this thread.
+  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
+  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
+  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
+  // call destroy to get the current state and set the state to DESTROYED. And when user calls
+  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
+  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
+  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
+  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
+  // to be used in the future.
+  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
+  // package private methods can only be called within the implementation of
+  // AsyncScanSingleRegionRpcRetryingCaller.
+  private final class ScanControllerImpl implements RawScanResultConsumer.ScanController {
+
+    // Make sure the methods are only called in this thread.
+    private final Thread callerThread = Thread.currentThread();
+
+    // INITIALIZED -> SUSPENDED -> DESTROYED
+    // INITIALIZED -> TERMINATED -> DESTROYED
+    // INITIALIZED -> DESTROYED
+    // If the state is incorrect we will throw IllegalStateException.
+    private ScanControllerState state = ScanControllerState.INITIALIZED;
+
+    private ScanResumerImpl resumer;
+
+    private void preCheck() {
+      Preconditions.checkState(Thread.currentThread() == callerThread,
+        "The current thread is %s, expected thread is %s, " +
+            "you should not call this method outside onNext or onHeartbeat",
+        Thread.currentThread(), callerThread);
+      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
+        "Invalid Stopper state %s", state);
+    }
+
+    @Override
+    public ScanResumer suspend() {
+      preCheck();
+      state = ScanControllerState.SUSPENDED;
+      ScanResumerImpl resumer = new ScanResumerImpl();
+      this.resumer = resumer;
+      return resumer;
+    }
+
+    @Override
+    public void terminate() {
+      preCheck();
+      state = ScanControllerState.TERMINATED;
+    }
+
+    // return the current state, and set the state to DESTROYED.
+    ScanControllerState destroy() {
+      ScanControllerState state = this.state;
+      this.state = ScanControllerState.DESTROYED;
+      return state;
+    }
+  }
+
+  private enum ScanResumerState {
+    INITIALIZED, SUSPENDED, RESUMED
+  }
+
+  // The resume method is allowed to be called in another thread so here we also use the
+  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
+  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
+  // and when user calls resume method, we will change the state to RESUMED. But the resume method
+  // could be called in other thread, and in fact, user could just do this:
+  // controller.suspend().resume()
+  // This is strange but valid. This means the scan could be resumed before we call the prepare
+  // method to do the actual suspend work. So in the resume method, we will check if the state is
+  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
+  // method, if the state is RESUMED already, we will just return an let the scan go on.
+  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
+  // package private methods can only be called within the implementation of
+  // AsyncScanSingleRegionRpcRetryingCaller.
+  private final class ScanResumerImpl implements RawScanResultConsumer.ScanResumer {
+
+    // INITIALIZED -> SUSPENDED -> RESUMED
+    // INITIALIZED -> RESUMED
+    private ScanResumerState state = ScanResumerState.INITIALIZED;
+
+    private ScanResponse resp;
+
+    private int numValidResults;
+
+    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
+    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
+    // every time when the previous task is finished. There could also be race as the renewal is
+    // executed in the timer thread, so we also need to check the state before lease renewal. If the
+    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
+    // renewal task.
+    private Timeout leaseRenewer;
+
+    @Override
+    public void resume() {
+      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
+      // just return at the first if condition without loading the resp and numValidResuls field. If
+      // resume is called after suspend, then it is also safe to just reference resp and
+      // numValidResults after the synchronized block as no one will change it anymore.
+      ScanResponse localResp;
+      int localNumValidResults;
+      synchronized (this) {
+        if (state == ScanResumerState.INITIALIZED) {
+          // user calls this method before we call prepare, so just set the state to
+          // RESUMED, the implementation will just go on.
+          state = ScanResumerState.RESUMED;
+          return;
+        }
+        if (state == ScanResumerState.RESUMED) {
+          // already resumed, give up.
+          return;
+        }
+        state = ScanResumerState.RESUMED;
+        if (leaseRenewer != null) {
+          leaseRenewer.cancel();
+        }
+        localResp = this.resp;
+        localNumValidResults = this.numValidResults;
+      }
+      completeOrNext(localResp, localNumValidResults);
+    }
+
+    private void scheduleRenewLeaseTask() {
+      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
+        TimeUnit.NANOSECONDS);
+    }
+
+    private synchronized void tryRenewLease() {
+      // the scan has already been resumed, give up
+      if (state == ScanResumerState.RESUMED) {
+        return;
+      }
+      renewLease();
+      // schedule the next renew lease task again as this is a one-time task.
+      scheduleRenewLeaseTask();
+    }
+
+    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
+    // for more details.
+    synchronized boolean prepare(ScanResponse resp, int numValidResults) {
+      if (state == ScanResumerState.RESUMED) {
+        // user calls resume before we actually suspend the scan, just continue;
+        return false;
+      }
+      state = ScanResumerState.SUSPENDED;
+      this.resp = resp;
+      this.numValidResults = numValidResults;
+      // if there are no more results in region then the scanner at RS side will be closed
+      // automatically so we do not need to renew lease.
+      if (resp.getMoreResultsInRegion()) {
+        // schedule renew lease task
+        scheduleRenewLeaseTask();
+      }
+      return true;
+    }
+  }
+
   public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
       AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
-      RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
-      int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+      RawScanResultConsumer consumer, Interface stub, HRegionLocation loc,
+      long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.scan = scan;
     this.scannerId = scannerId;
@@ -115,6 +288,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     this.consumer = consumer;
     this.stub = stub;
     this.loc = loc;
+    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
     this.pauseNs = pauseNs;
     this.maxAttempts = maxAttempts;
     this.scanTimeoutNs = scanTimeoutNs;
@@ -143,9 +317,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
     stub.scan(controller, req, resp -> {
       if (controller.failed()) {
-        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
-            + " for " + loc.getRegionInfo().getEncodedName() + " of "
-            + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed",
+        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
+            " for " + loc.getRegionInfo().getEncodedName() + " of " +
+            loc.getRegionInfo().getTable() + " failed, ignore, probably already closed",
           controller.getFailed());
       }
     });
@@ -182,16 +356,15 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   private void onError(Throwable error) {
     error = translateException(error);
     if (tries > startLogErrorsCnt) {
-      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
-          + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable()
-          + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
-          + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
-          + " ms",
+      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
+          loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() +
+          " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
+          TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
+          " ms",
         error);
     }
-    boolean scannerClosed =
-        error instanceof UnknownScannerException || error instanceof NotServingRegionException
-            || error instanceof RegionServerStoppedException;
+    boolean scannerClosed = error instanceof UnknownScannerException ||
+        error instanceof NotServingRegionException || error instanceof RegionServerStoppedException;
     RetriesExhaustedException.ThrowableWithExtraContext qt =
         new RetriesExhaustedException.ThrowableWithExtraContext(error,
             EnvironmentEdgeManager.currentTime(), "");
@@ -229,7 +402,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private void updateNextStartRowWhenError(Result result) {
     nextStartRowWhenError = result.getRow();
-    includeNextStartRowWhenError = scan.getBatch() > 0 || result.isPartial();
+    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
   }
 
   private void completeWhenNoMoreResultsInRegion() {
@@ -248,6 +421,27 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
   }
 
+  private void completeOrNext(ScanResponse resp, int numValidResults) {
+    if (resp.hasMoreResults() && !resp.getMoreResults()) {
+      // RS tells us there is no more data for the whole scan
+      completeNoMoreResults();
+      return;
+    }
+    if (scan.getLimit() > 0) {
+      // The RS should have set the moreResults field in ScanResponse to false when we have reached
+      // the limit.
+      int limit = scan.getLimit() - numValidResults;
+      assert limit > 0;
+      scan.setLimit(limit);
+    }
+    // as in 2.0 this value will always be set
+    if (!resp.getMoreResultsInRegion()) {
+      completeWhenNoMoreResultsInRegion.run();
+      return;
+    }
+    next();
+  }
+
   private void onComplete(HBaseRpcController controller, ScanResponse resp) {
     if (controller.failed()) {
       onError(controller.getFailed());
@@ -269,20 +463,16 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       return;
     }
 
-    boolean stopByUser;
+    ScanControllerImpl scanController = new ScanControllerImpl();
     if (results.length == 0) {
       // if we have nothing to return then this must be a heartbeat message.
-      stopByUser = !consumer.onHeartbeat();
+      consumer.onHeartbeat(scanController);
     } else {
       updateNextStartRowWhenError(results[results.length - 1]);
-      stopByUser = !consumer.onNext(results);
+      consumer.onNext(results, scanController);
     }
-    if (resp.hasMoreResults() && !resp.getMoreResults()) {
-      // RS tells us there is no more data for the whole scan
-      completeNoMoreResults();
-      return;
-    }
-    if (stopByUser) {
+    ScanControllerState state = scanController.destroy();
+    if (state == ScanControllerState.TERMINATED) {
       if (resp.getMoreResultsInRegion()) {
         // we have more results in region but user request to stop the scan, so we need to close the
         // scanner explicitly.
@@ -291,19 +481,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeNoMoreResults();
       return;
     }
-    if (scan.getLimit() > 0) {
-      // The RS should have set the moreResults field in ScanResponse to false when we have reached
-      // the limit.
-      int limit = scan.getLimit() - results.length;
-      assert limit > 0;
-      scan.setLimit(limit);
-    }
-    // as in 2.0 this value will always be set
-    if (!resp.getMoreResultsInRegion()) {
-      completeWhenNoMoreResultsInRegion.run();
-      return;
+    if (state == ScanControllerState.SUSPENDED) {
+      if (scanController.resumer.prepare(resp, results.length)) {
+        return;
+      }
     }
-    next();
+    completeOrNext(resp, results.length);
   }
 
   private void call() {
@@ -337,6 +520,15 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     call();
   }
 
+  private void renewLease() {
+    nextCallSeq++;
+    resetController(controller, rpcTimeoutNs);
+    ScanRequest req =
+        RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
+    stub.scan(controller, req, resp -> {
+    });
+  }
+
   /**
    * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
    * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index e2c4ec3..38d4b2c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -18,8 +18,8 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 
 import java.io.IOException;
@@ -29,9 +29,7 @@ import java.util.Queue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
@@ -45,8 +43,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
 
   private final RawAsyncTable rawTable;
 
-  private final Scan scan;
-
   private final long maxCacheSize;
 
   private final Queue<Result> queue = new ArrayDeque<>();
@@ -57,16 +53,10 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
 
   private Throwable error;
 
-  private boolean prefetchStopped;
-
-  private int numberOfOnCompleteToIgnore;
-
-  // used to filter out cells that already returned when we restart a scan
-  private Cell lastCell;
+  private ScanResumer resumer;
 
   public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
     this.rawTable = table;
-    this.scan = scan;
     this.maxCacheSize = maxCacheSize;
     table.scan(scan, this);
   }
@@ -76,71 +66,36 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
     cacheSize += calcEstimatedSize(result);
   }
 
-  private void stopPrefetch(Result lastResult) {
-    prefetchStopped = true;
-    if (lastResult.isPartial() || scan.getBatch() > 0) {
-      scan.withStartRow(lastResult.getRow());
-      lastCell = lastResult.rawCells()[lastResult.rawCells().length - 1];
-    } else {
-      scan.withStartRow(lastResult.getRow(), false);
-    }
+  private void stopPrefetch(ScanController controller) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        String.format("0x%x", System.identityHashCode(this)) + " stop prefetching when scanning "
-            + rawTable.getName() + " as the cache size " + cacheSize
-            + " is greater than the maxCacheSize " + maxCacheSize + ", the next start row is "
-            + Bytes.toStringBinary(scan.getStartRow()) + ", lastCell is " + lastCell);
+      LOG.debug(String.format("0x%x", System.identityHashCode(this)) +
+          " stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
+          cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
     }
-    // Ignore an onComplete call as the scan is stopped by us.
-    // Here we can not use a simple boolean flag. A scan operation can cross multiple regions and
-    // the regions may be located on different regionservers, so it is possible that the methods of
-    // RawScanResultConsumer are called in different rpc framework threads and overlapped with each
-    // other. It may happen that
-    // 1. we stop scan1
-    // 2. we start scan2
-    // 3. we stop scan2
-    // 4. onComplete for scan1 is called
-    // 5. onComplete for scan2 is called
-    // So if we use a boolean flag here then we can only ignore the onComplete in step4 and think
-    // that the onComplete in step 5 tells us there is no data.
-    numberOfOnCompleteToIgnore++;
+    resumer = controller.suspend();
   }
 
   @Override
-  public synchronized boolean onNext(Result[] results) {
+  public synchronized void onNext(Result[] results, ScanController controller) {
     assert results.length > 0;
     if (closed) {
-      return false;
-    }
-    Result firstResult = results[0];
-    if (lastCell != null) {
-      firstResult = filterCells(firstResult, lastCell);
-      if (firstResult != null) {
-        // do not set lastCell to null if the result after filtering is null as there may still be
-        // other cells that can be filtered out
-        lastCell = null;
-        addToCache(firstResult);
-      } else if (results.length == 1) {
-        // the only one result is null
-        return true;
-      }
-    } else {
-      addToCache(firstResult);
+      controller.terminate();
+      return;
     }
-    for (int i = 1; i < results.length; i++) {
-      addToCache(results[i]);
+    for (Result result : results) {
+      addToCache(result);
     }
     notifyAll();
-    if (cacheSize < maxCacheSize) {
-      return true;
+    if (cacheSize >= maxCacheSize) {
+      stopPrefetch(controller);
     }
-    stopPrefetch(results[results.length - 1]);
-    return false;
   }
 
   @Override
-  public synchronized boolean onHeartbeat() {
-    return !closed;
+  public synchronized void onHeartbeat(ScanController controller) {
+    if (closed) {
+      controller.terminate();
+    }
   }
 
   @Override
@@ -150,12 +105,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
 
   @Override
   public synchronized void onComplete() {
-    // Do not mark the scanner as closed if the scan is stopped by us due to cache size limit since
-    // we may resume later by starting a new scan. See resumePrefetch.
-    if (numberOfOnCompleteToIgnore > 0) {
-      numberOfOnCompleteToIgnore--;
-      return;
-    }
     closed = true;
     notifyAll();
   }
@@ -164,8 +113,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
     if (LOG.isDebugEnabled()) {
       LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
     }
-    prefetchStopped = false;
-    rawTable.scan(scan, this);
+    resumer.resume();
+    resumer = null;
   }
 
   @Override
@@ -186,7 +135,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
     }
     Result result = queue.poll();
     cacheSize -= calcEstimatedSize(result);
-    if (prefetchStopped && cacheSize <= maxCacheSize / 2) {
+    if (resumer != null && cacheSize <= maxCacheSize / 2) {
       resumePrefetch();
     }
     return result;
@@ -197,13 +146,22 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
     closed = true;
     queue.clear();
     cacheSize = 0;
+    if (resumer != null) {
+      resumePrefetch();
+    }
     notifyAll();
   }
 
   @Override
   public boolean renewLease() {
-    // we will do prefetching in the background and if there is no space we will just terminate the
-    // background scan operation. So there is no reason to renew lease here.
+    // we will do prefetching in the background and if there is no space we will just suspend the
+    // scanner. The renew lease operation will be handled in the background.
     return false;
   }
+
+  // used in tests to test whether the scanner has been suspended
+  @VisibleForTesting
+  synchronized boolean isSuspended() {
+    return resumer != null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
index 59924cf..e493123 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
@@ -52,13 +52,13 @@ public interface RawAsyncTable extends AsyncTableBase {
 
   /**
    * The basic scan API uses the observer pattern. All results that match the given scan object will
-   * be passed to the given {@code consumer} by calling
-   * {@link RawScanResultConsumer#onNext(Result[])}. {@link RawScanResultConsumer#onComplete()}
-   * means the scan is finished, and {@link RawScanResultConsumer#onError(Throwable)} means we hit
-   * an unrecoverable error and the scan is terminated. {@link RawScanResultConsumer#onHeartbeat()}
-   * means the RS is still working but we can not get a valid result to call
-   * {@link RawScanResultConsumer#onNext(Result[])}. This is usually because the matched results are
-   * too sparse, for example, a filter which almost filters out everything is specified.
+   * be passed to the given {@code consumer} by calling {@code RawScanResultConsumer.onNext}.
+   * {@code RawScanResultConsumer.onComplete} means the scan is finished, and
+   * {@code RawScanResultConsumer.onError} means we hit an unrecoverable error and the scan is
+   * terminated. {@code RawScanResultConsumer.onHeartbeat} means the RS is still working but we can
+   * not get a valid result to call {@code RawScanResultConsumer.onNext}. This is usually because
+   * the matched results are too sparse, for example, a filter which almost filters out everything
+   * is specified.
    * <p>
    * Notice that, the methods of the given {@code consumer} will be called directly in the rpc
    * framework's callback thread, so typically you should not do any time consuming work inside

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 00f255e..fa3d792 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -358,9 +358,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
     scan(scan, new RawScanResultConsumer() {
 
       @Override
-      public boolean onNext(Result[] results) {
+      public void onNext(Result[] results, ScanController controller) {
         scanResults.addAll(Arrays.asList(results));
-        return true;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
index 2e5d422..17e0afa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
@@ -24,10 +24,10 @@ import org.apache.hadoop.hbase.client.Result;
 /**
  * Receives {@link Result} for an asynchronous scan.
  * <p>
- * Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send
- * request to HBase service. So if you want the asynchronous scanner fetch data from HBase in
- * background while you process the returned data, you need to move the processing work to another
- * thread to make the {@code onNext} call return immediately. And please do NOT do any time
+ * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread
+ * which we send request to HBase service. So if you want the asynchronous scanner fetch data from
+ * HBase in background while you process the returned data, you need to move the processing work to
+ * another thread to make the {@code onNext} call return immediately. And please do NOT do any time
  * consuming tasks in all methods below unless you know what you are doing.
  */
 @InterfaceAudience.Public
@@ -35,20 +35,70 @@ import org.apache.hadoop.hbase.client.Result;
 public interface RawScanResultConsumer {
 
   /**
+   * Used to resume a scan.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  interface ScanResumer {
+
+    /**
+     * Resume the scan. You are free to call it multiple time but only the first call will take
+     * effect.
+     */
+    void resume();
+  }
+
+  /**
+   * Used to suspend or stop a scan.
+   * <p>
+   * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A
+   * IllegalStateException will be thrown if you call them at other places.
+   * <p>
+   * You can only call one of the methods below, i.e., call suspend or terminate(of course you are
+   * free to not call them both), and the methods are not reentrant. A IllegalStateException will be
+   * thrown if you have already called one of the methods.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  interface ScanController {
+
+    /**
+     * Suspend the scan.
+     * <p>
+     * This means we will stop fetching data in background, i.e., will not call onNext any more
+     * before you resume the scan.
+     * @return A resumer used to resume the scan later.
+     */
+    ScanResumer suspend();
+
+    /**
+     * Terminate the scan.
+     * <p>
+     * This is useful when you have got enough results and want to stop the scan in onNext method,
+     * or you want to stop the scan in onHeartbeat method because it has spent too many time.
+     */
+    void terminate();
+  }
+
+  /**
+   * Indicate that we have receive some data.
    * @param results the data fetched from HBase service.
-   * @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within scope of onNext method. You can only call its method in
+   *          onNext, do NOT store it and call it later outside onNext.
    */
-  boolean onNext(Result[] results);
+  void onNext(Result[] results, ScanController controller);
 
   /**
    * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
    * onNext.
    * <p>
    * This method give you a chance to terminate a slow scan operation.
-   * @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within the scope of onHeartbeat method. You can only call its
+   *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
    */
-  default boolean onHeartbeat() {
-    return true;
+  default void onHeartbeat(ScanController controller) {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
index d10e0f2..21b376c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
@@ -147,7 +147,7 @@ public class Threads {
     try {
       Thread.sleep(millis);
     } catch (InterruptedException e) {
-      e.printStackTrace();
+      LOG.warn("sleep interrupted", e);
       Thread.currentThread().interrupt();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
index f8d0a19..30f3d30 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -407,7 +407,7 @@ public class AsyncAggregationClient {
       private R value = null;
 
       @Override
-      public boolean onNext(Result[] results) {
+      public void onNext(Result[] results, ScanController controller) {
         try {
           for (Result result : results) {
             Cell weightCell = result.getColumnLatestCell(family, weightQualifier);
@@ -419,15 +419,15 @@ public class AsyncAggregationClient {
               } else {
                 future.completeExceptionally(new NoSuchElementException());
               }
-              return false;
+              controller.terminate();
+              return;
             }
             Cell valueCell = result.getColumnLatestCell(family, valueQualifier);
             value = ci.getValue(family, valueQualifier, valueCell);
           }
-          return true;
         } catch (IOException e) {
           future.completeExceptionally(e);
-          return false;
+          controller.terminate();
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java
new file mode 100644
index 0000000..a70b8d2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanRenewLease {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  private static AsyncConnection CONN;
+
+  private static RawAsyncTable TABLE;
+
+  private static int SCANNER_LEASE_TIMEOUT_PERIOD_MS = 5000;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      SCANNER_LEASE_TIMEOUT_PERIOD_MS);
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    TABLE = CONN.getRawTable(TABLE_NAME);
+    TABLE.putAll(IntStream.range(0, 10).mapToObj(
+      i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
+        .collect(Collectors.toList())).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private static final class RenewLeaseConsumer implements RawScanResultConsumer {
+
+    private final List<Result> results = new ArrayList<>();
+
+    private Throwable error;
+
+    private boolean finished = false;
+
+    private boolean suspended = false;
+
+    @Override
+    public synchronized void onNext(Result[] results, ScanController controller) {
+      for (Result result : results) {
+        this.results.add(result);
+      }
+      if (!suspended) {
+        ScanResumer resumer = controller.suspend();
+        new Thread(() -> {
+          Threads.sleep(2 * SCANNER_LEASE_TIMEOUT_PERIOD_MS);
+          try {
+            TABLE.put(new Put(Bytes.toBytes(String.format("%02d", 10))).addColumn(FAMILY, CQ,
+              Bytes.toBytes(10))).get();
+          } catch (Exception e) {
+            onError(e);
+          }
+          resumer.resume();
+        }).start();
+      }
+    }
+
+    @Override
+    public synchronized void onError(Throwable error) {
+      this.finished = true;
+      this.error = error;
+      notifyAll();
+    }
+
+    @Override
+    public synchronized void onComplete() {
+      this.finished = true;
+      notifyAll();
+    }
+
+    public synchronized List<Result> get() throws Throwable {
+      while (!finished) {
+        wait();
+      }
+      if (error != null) {
+        throw error;
+      }
+      return results;
+    }
+  }
+
+  @Test
+  public void test() throws Throwable {
+    RenewLeaseConsumer consumer = new RenewLeaseConsumer();
+    TABLE.scan(new Scan(), consumer);
+    List<Result> results = consumer.get();
+    // should not see the newly added value
+    assertEquals(10, results.size());
+    IntStream.range(0, 10).forEach(i -> {
+      Result result = results.get(i);
+      assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
+      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
+    });
+    // now we can see the newly added value
+    List<Result> results2 = TABLE.scanAll(new Scan()).get();
+    assertEquals(11, results2.size());
+    IntStream.range(0, 11).forEach(i -> {
+      Result result = results2.get(i);
+      assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
+      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java
index 006770d..a3cad17 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java
@@ -82,5 +82,4 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
     }
     return results;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
new file mode 100644
index 0000000..0f132d1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScannerCloseWhileSuspending {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  private static AsyncConnection CONN;
+
+  private static AsyncTable TABLE;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+    TABLE.putAll(IntStream.range(0, 100).mapToObj(
+      i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
+        .collect(Collectors.toList())).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private int getScannersCount() {
+    return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+        .map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount())
+        .sum();
+  }
+
+  @Test
+  public void testCloseScannerWhileSuspending() throws Exception {
+    try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) {
+      TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
+
+        @Override
+        public boolean evaluate() throws Exception {
+          return ((AsyncTableResultScanner) scanner).isSuspended();
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "The given scanner has been suspended in time";
+        }
+      });
+      assertEquals(1, getScannersCount());
+    }
+    TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return getScannersCount() == 0;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Still have " + getScannersCount() + " scanners opened";
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f037f230/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
index a8ef353..0be236d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
@@ -48,12 +48,11 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
     private Throwable error;
 
     @Override
-    public synchronized boolean onNext(Result[] results) {
+    public synchronized void onNext(Result[] results, ScanController controller) {
       for (Result result : results) {
         queue.offer(result);
       }
       notifyAll();
-      return true;
     }
 
     @Override


Mime
View raw message