hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [42/50] [abbrv] hbase git commit: HBASE-18553 Expose scan cursor for asynchronous scanner
Date Thu, 17 Aug 2017 19:15:40 GMT
HBASE-18553 Expose scan cursor for asynchronous scanner


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4c74a73d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4c74a73d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4c74a73d

Branch: refs/heads/HBASE-14070.HLC
Commit: 4c74a73d57e09fd2c0ecde862a196c28dc6cd219
Parents: 2a9cdd5
Author: zhangduo <zhangduo@apache.org>
Authored: Tue Aug 15 17:15:06 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Wed Aug 16 21:04:57 2017 +0800

----------------------------------------------------------------------
 .../AsyncScanSingleRegionRpcRetryingCaller.java |  35 ++++-
 .../hbase/client/AsyncTableResultScanner.java   |  20 ++-
 .../hbase/client/RawScanResultConsumer.java     |  11 +-
 .../client/AbstractTestResultScannerCursor.java |  89 +++++++++++
 .../client/TestAsyncResultScannerCursor.java    |  49 ++++++
 .../hbase/client/TestRawAsyncScanCursor.java    | 157 +++++++++++++------
 .../hbase/client/TestResultScannerCursor.java   |  34 ++++
 .../hadoop/hbase/client/TestScanCursor.java     |  90 -----------
 8 files changed, 330 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 02a4357..d16cb8b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -157,10 +158,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
     private ScanResumerImpl resumer;
 
-    public ScanControllerImpl(ScanResponse resp) {
-      callerThread = Thread.currentThread();
-      cursor = resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
-          : Optional.empty();
+    public ScanControllerImpl(Optional<Cursor> cursor) {
+      this.callerThread = Thread.currentThread();
+      this.cursor = cursor;
     }
 
     private void preCheck() {
@@ -476,10 +476,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
     updateServerSideMetrics(scanMetrics, resp);
     boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
+    Result[] rawResults;
     Result[] results;
     int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
     try {
-      Result[] rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
+      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
       updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
       results = resultCache.addAndGet(
         Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
@@ -493,12 +494,30 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       return;
     }
 
-    ScanControllerImpl scanController = new ScanControllerImpl(resp);
+    ScanControllerImpl scanController;
     if (results.length > 0) {
+      scanController = new ScanControllerImpl(
+          resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
+              : Optional.empty());
       updateNextStartRowWhenError(results[results.length - 1]);
       consumer.onNext(results, scanController);
-    } else if (resp.hasHeartbeatMessage() && resp.getHeartbeatMessage()) {
-      consumer.onHeartbeat(scanController);
+    } else {
+      Optional<Cursor> cursor = Optional.empty();
+      if (resp.hasCursor()) {
+        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
+      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
+        // It is size limit exceed and we need to return the last Result's row.
+        // When user setBatch and the scanner is reopened, the server may return Results
that
+        // user has seen and the last Result can not be seen because the number is not enough.
+        // So the row keys of results may not be same, we must use the last one.
+        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
+      }
+      scanController = new ScanControllerImpl(cursor);
+      if (isHeartbeatMessage || cursor.isPresent()) {
+        // only call onHeartbeat if server tells us explicitly this is a heartbeat message,
or we
+        // want to pass a cursor to upper layer.
+        consumer.onHeartbeat(scanController);
+      }
     }
     ScanControllerState state = scanController.destroy();
     if (state == ScanControllerState.TERMINATED) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 28a5568..3050084 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayDeque;
@@ -31,6 +28,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
 
 /**
  * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
@@ -46,6 +45,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer
{
 
   private final long maxCacheSize;
 
+  private final Scan scan;
+
   private final Queue<Result> queue = new ArrayDeque<>();
 
   private ScanMetrics scanMetrics;
@@ -61,6 +62,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer
{
   public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
     this.rawTable = table;
     this.maxCacheSize = maxCacheSize;
+    this.scan = scan;
     table.scan(scan, this);
   }
 
@@ -98,6 +100,10 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer
{
   public synchronized void onHeartbeat(ScanController controller) {
     if (closed) {
       controller.terminate();
+      return;
+    }
+    if (scan.isNeedCursorResult()) {
+      controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
     }
   }
 
@@ -143,9 +149,11 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer
{
       }
     }
     Result result = queue.poll();
-    cacheSize -= calcEstimatedSize(result);
-    if (resumer != null && cacheSize <= maxCacheSize / 2) {
-      resumePrefetch();
+    if (!result.isCursor()) {
+      cacheSize -= calcEstimatedSize(result);
+      if (resumer != null && cacheSize <= maxCacheSize / 2) {
+        resumePrefetch();
+      }
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
index 54d4887..4fedb0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
@@ -95,8 +95,15 @@ public interface RawScanResultConsumer {
   void onNext(Result[] results, ScanController controller);
 
   /**
-   * Indicate that there is an heartbeat message but we have not cumulated enough cells to
call
-   * onNext.
+   * Indicate that there is a heartbeat message but we have not cumulated enough cells to
call
+   * {@link #onNext(Result[], ScanController)}.
+   * <p>
+   * Note that this method will always be called when RS returns something to us but we do
not have
+   * enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not
be a
+   * 'heartbeat' message for RS, for example, we have a large row with many cells and size
limit is
+   * exceeded before sending all the cells for this row. For RS it does send some data to
us and the
+   * time limit has not been reached, but we can not return the data to client so here we
call this
+   * method to tell client we have already received something.
    * <p>
    * This method give you a chance to terminate a slow scan operation.
    * @param controller used to suspend or terminate the scan. Notice that the {@code controller}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java
new file mode 100644
index 0000000..3df7a7b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+public abstract class AbstractTestResultScannerCursor extends AbstractTestScanCursor {
+
+  protected abstract ResultScanner getScanner(Scan scan) throws Exception;
+
+  @Test
+  public void testHeartbeatWithSparseFilter() throws Exception {
+    try (ResultScanner scanner = getScanner(createScanWithSparseFilter())) {
+      int num = 0;
+      Result r;
+      while ((r = scanner.next()) != null) {
+        if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
+          assertTrue(r.isCursor());
+          assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
+            r.getCursor().getRow());
+        } else {
+          assertFalse(r.isCursor());
+          assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
+        }
+        num++;
+      }
+    }
+  }
+
+  @Test
+  public void testHeartbeatWithSparseFilterReversed() throws Exception {
+    try (ResultScanner scanner = getScanner(createReversedScanWithSparseFilter())) {
+      int num = 0;
+      Result r;
+      while ((r = scanner.next()) != null) {
+        if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
+          assertTrue(r.isCursor());
+          assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS],
+            r.getCursor().getRow());
+        } else {
+          assertFalse(r.isCursor());
+          assertArrayEquals(ROWS[0], r.getRow());
+        }
+        num++;
+      }
+    }
+  }
+
+  @Test
+  public void testSizeLimit() throws IOException {
+    try (ResultScanner scanner =
+        TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit()))
{
+      int num = 0;
+      Result r;
+      while ((r = scanner.next()) != null) {
+        if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1)
{
+          assertTrue(r.isCursor());
+          assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
+            r.getCursor().getRow());
+        } else {
+          assertFalse(r.isCursor());
+          assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
+        }
+        num++;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java
new file mode 100644
index 0000000..5aebb4a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.ForkJoinPool;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncResultScannerCursor extends AbstractTestResultScannerCursor {
+
+  private static AsyncConnection CONN;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    AbstractTestScanCursor.setUpBeforeClass();
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+  }
+
+  public static void tearDownAfterClass() throws Exception {
+    if (CONN != null) {
+      CONN.close();
+    }
+    AbstractTestScanCursor.tearDownAfterClass();
+  }
+
+  @Override
+  protected ResultScanner getScanner(Scan scan) throws Exception {
+    return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
index 9caf942..4bca451 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
@@ -27,70 +27,83 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({ MediumTests.class, ClientTests.class })
 public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
 
+  private static AsyncConnection CONN;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    AbstractTestScanCursor.setUpBeforeClass();
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+  }
+
+  public static void tearDownAfterClass() throws Exception {
+    if (CONN != null) {
+      CONN.close();
+    }
+    AbstractTestScanCursor.tearDownAfterClass();
+  }
+
   private void doTest(boolean reversed)
       throws InterruptedException, ExecutionException, IOException {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    try (AsyncConnection conn =
-        ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
-      RawAsyncTable table = conn.getRawTable(TABLE_NAME);
-      table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(),
-        new RawScanResultConsumer() {
-
-          private int count;
-
-          @Override
-          public void onHeartbeat(ScanController controller) {
-            int row = count / NUM_FAMILIES / NUM_QUALIFIERS;
-            if (reversed) {
-              row = NUM_ROWS - 1 - row;
-            }
-            try {
-              assertArrayEquals(ROWS[row], controller.cursor().get().getRow());
-              count++;
-            } catch (Throwable e) {
-              future.completeExceptionally(e);
-              throw e;
-            }
+    RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
+    table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(),
+      new RawScanResultConsumer() {
+
+        private int count;
+
+        @Override
+        public void onHeartbeat(ScanController controller) {
+          int row = count / NUM_FAMILIES / NUM_QUALIFIERS;
+          if (reversed) {
+            row = NUM_ROWS - 1 - row;
+          }
+          try {
+            assertArrayEquals(ROWS[row], controller.cursor().get().getRow());
+            count++;
+          } catch (Throwable e) {
+            future.completeExceptionally(e);
+            throw e;
           }
+        }
 
-          @Override
-          public void onNext(Result[] results, ScanController controller) {
-            try {
-              assertEquals(1, results.length);
-              assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
-              // we will always provide a scan cursor if time limit is reached.
-              if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
-                assertFalse(controller.cursor().isPresent());
-              } else {
-                assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
-                  controller.cursor().get().getRow());
-              }
-              assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
-              count++;
-            } catch (Throwable e) {
-              future.completeExceptionally(e);
-              throw e;
+        @Override
+        public void onNext(Result[] results, ScanController controller) {
+          try {
+            assertEquals(1, results.length);
+            assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
+            // we will always provide a scan cursor if time limit is reached.
+            if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
+              assertFalse(controller.cursor().isPresent());
+            } else {
+              assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
+                controller.cursor().get().getRow());
             }
+            assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
+            count++;
+          } catch (Throwable e) {
+            future.completeExceptionally(e);
+            throw e;
           }
+        }
 
-          @Override
-          public void onError(Throwable error) {
-            future.completeExceptionally(error);
-          }
+        @Override
+        public void onError(Throwable error) {
+          future.completeExceptionally(error);
+        }
 
-          @Override
-          public void onComplete() {
-            future.complete(null);
-          }
-        });
-      future.get();
-    }
+        @Override
+        public void onComplete() {
+          future.complete(null);
+        }
+      });
+    future.get();
   }
 
   @Test
@@ -104,4 +117,50 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
       throws IOException, InterruptedException, ExecutionException {
     doTest(true);
   }
+
+  @Test
+  public void testSizeLimit() throws InterruptedException, ExecutionException {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
+    table.scan(createScanWithSizeLimit(), new RawScanResultConsumer() {
+
+      private int count;
+
+      @Override
+      public void onHeartbeat(ScanController controller) {
+        try {
+          assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS],
+            controller.cursor().get().getRow());
+          count++;
+        } catch (Throwable e) {
+          future.completeExceptionally(e);
+          throw e;
+        }
+      }
+
+      @Override
+      public void onNext(Result[] results, ScanController controller) {
+        try {
+          assertFalse(controller.cursor().isPresent());
+          assertEquals(1, results.length);
+          assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], results[0].getRow());
+          count++;
+        } catch (Throwable e) {
+          future.completeExceptionally(e);
+          throw e;
+        }
+      }
+
+      @Override
+      public void onError(Throwable error) {
+        future.completeExceptionally(error);
+      }
+
+      @Override
+      public void onComplete() {
+        future.complete(null);
+      }
+    });
+    future.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java
new file mode 100644
index 0000000..3b2ef2c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestResultScannerCursor extends AbstractTestResultScannerCursor {
+
+  @Override
+  protected ResultScanner getScanner(Scan scan) throws IOException {
+    return TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(scan);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java
deleted file mode 100644
index f7798f0..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ MediumTests.class, ClientTests.class })
-public class TestScanCursor extends AbstractTestScanCursor {
-
-  @Test
-  public void testHeartbeatWithSparseFilter() throws Exception {
-    try (ResultScanner scanner =
-        TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter()))
{
-      int num = 0;
-      Result r;
-      while ((r = scanner.next()) != null) {
-        if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
-          Assert.assertTrue(r.isCursor());
-          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
-            r.getCursor().getRow());
-        } else {
-          Assert.assertFalse(r.isCursor());
-          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
-        }
-        num++;
-      }
-    }
-  }
-
-  @Test
-  public void testHeartbeatWithSparseFilterReversed() throws Exception {
-    try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME)
-        .getScanner(createReversedScanWithSparseFilter())) {
-      int num = 0;
-      Result r;
-      while ((r = scanner.next()) != null) {
-        if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
-          Assert.assertTrue(r.isCursor());
-          Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS],
-            r.getCursor().getRow());
-        } else {
-          Assert.assertFalse(r.isCursor());
-          Assert.assertArrayEquals(ROWS[0], r.getRow());
-        }
-        num++;
-      }
-    }
-  }
-
-  @Test
-  public void testSizeLimit() throws IOException {
-    try (ResultScanner scanner =
-        TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit()))
{
-      int num = 0;
-      Result r;
-      while ((r = scanner.next()) != null) {
-        if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1)
{
-          Assert.assertTrue(r.isCursor());
-          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
-            r.getCursor().getRow());
-        } else {
-          Assert.assertFalse(r.isCursor());
-          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
-        }
-        num++;
-      }
-    }
-  }
-}


Mime
View raw message