hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-17595 addendum fix the problem for mayHaveMoreCellsInRow
Date Thu, 23 Mar 2017 13:00:12 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 9726c7168 -> 849ab5ff2


HBASE-17595 addendum fix the problem for mayHaveMoreCellsInRow


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/849ab5ff
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/849ab5ff
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/849ab5ff

Branch: refs/heads/branch-1
Commit: 849ab5ff2998192d4f21d49f8356cc9a4370743a
Parents: 9726c71
Author: zhangduo <zhangduo@apache.org>
Authored: Thu Mar 23 15:47:26 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Thu Mar 23 20:35:12 2017 +0800

----------------------------------------------------------------------
 .../client/AllowPartialScanResultCache.java     |  34 +++-
 .../hbase/client/BatchScanResultCache.java      |  41 ++++-
 .../hadoop/hbase/client/ClientScanner.java      |  17 +-
 .../hbase/client/CompleteScanResultCache.java   |  24 ++-
 .../hadoop/hbase/client/ConnectionUtils.java    |  24 ---
 .../org/apache/hadoop/hbase/client/Scan.java    |   2 -
 .../hadoop/hbase/client/ScanResultCache.java    |   7 +-
 .../hadoop/hbase/regionserver/HRegion.java      |   2 +-
 .../hbase/regionserver/RSRpcServices.java       | 106 ++++++++---
 .../hbase/regionserver/ScannerContext.java      |   2 +-
 .../hbase/client/ColumnCountOnRowFilter.java    |  58 ++++++
 .../hbase/client/TestLimitedScanWithFilter.java | 177 +++++++++++++++++++
 12 files changed, 414 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
index 82f1ea0..5b6c411 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
@@ -38,13 +39,23 @@ class AllowPartialScanResultCache implements ScanResultCache {
   // beginning of a row when retry.
   private Cell lastCell;
 
-  private void updateLastCell(Result result) {
+  private boolean lastResultPartial;
+
+  private int numberOfCompleteRows;
+
+  private void recordLastResult(Result result) {
     lastCell = result.rawCells()[result.rawCells().length - 1];
+    lastResultPartial = result.mayHaveMoreCellsInRow();
   }
 
   @Override
   public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException
{
     if (results.length == 0) {
+      if (!isHeartbeatMessage && lastResultPartial) {
+        // An empty non heartbeat result indicate that there must be a row change. So if
the
+        // lastResultPartial is true then we need to increase numberOfCompleteRows.
+        numberOfCompleteRows++;
+      }
       return EMPTY_RESULT_ARRAY;
     }
     int i;
@@ -58,16 +69,29 @@ class AllowPartialScanResultCache implements ScanResultCache {
     if (i == results.length) {
       return EMPTY_RESULT_ARRAY;
     }
-    updateLastCell(results[results.length - 1]);
+    if (lastResultPartial && !CellUtil.matchingRow(lastCell, results[0].getRow()))
{
+      // there is a row change, so increase numberOfCompleteRows
+      numberOfCompleteRows++;
+    }
+    recordLastResult(results[results.length - 1]);
     if (i > 0) {
-      return Arrays.copyOfRange(results, i, results.length);
-    } else {
-      return results;
+      results = Arrays.copyOfRange(results, i, results.length);
     }
+    for (Result result : results) {
+      if (!result.mayHaveMoreCellsInRow()) {
+        numberOfCompleteRows++;
+      }
+    }
+    return results;
   }
 
   @Override
   public void clear() {
     // we do not cache anything
   }
+
+  @Override
+  public int numberOfCompleteRows() {
+    return numberOfCompleteRows;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
index 9ab959b..293f411 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
@@ -26,6 +26,7 @@ import java.util.Deque;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -45,19 +46,25 @@ public class BatchScanResultCache implements ScanResultCache {
   // beginning of a row when retry.
   private Cell lastCell;
 
+  private boolean lastResultPartial;
+
   private final Deque<Result> partialResults = new ArrayDeque<>();
 
   private int numCellsOfPartialResults;
 
+  private int numberOfCompleteRows;
+
   public BatchScanResultCache(int batch) {
     this.batch = batch;
   }
 
-  private void updateLastCell(Result result) {
+  private void recordLastResult(Result result) {
     lastCell = result.rawCells()[result.rawCells().length - 1];
+    lastResultPartial = result.mayHaveMoreCellsInRow();
   }
 
   private Result createCompletedResult() throws IOException {
+    numberOfCompleteRows++;
     Result result = Result.createCompleteResult(partialResults);
     partialResults.clear();
     numCellsOfPartialResults = 0;
@@ -104,8 +111,15 @@ public class BatchScanResultCache implements ScanResultCache {
   @Override
   public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException
{
     if (results.length == 0) {
-      if (!partialResults.isEmpty() && !isHeartbeatMessage) {
-        return new Result[] { createCompletedResult() };
+      if (!isHeartbeatMessage) {
+        if (!partialResults.isEmpty()) {
+          return new Result[] { createCompletedResult() };
+        }
+        if (lastResultPartial) {
+          // An empty non heartbeat result indicate that there must be a row change. So if
the
+          // lastResultPartial is true then we need to increase numberOfCompleteRows.
+          numberOfCompleteRows++;
+        }
       }
       return EMPTY_RESULT_ARRAY;
     }
@@ -115,6 +129,17 @@ public class BatchScanResultCache implements ScanResultCache {
       if (result == null) {
         continue;
       }
+      if (!partialResults.isEmpty()) {
+        if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
+          // there is a row change
+          regroupedResults.add(createCompletedResult());
+        }
+      } else if (lastResultPartial && !CellUtil.matchingRow(lastCell, result.getRow()))
{
+        // As for batched scan we may return partial results to user if we reach the batch
limit, so
+        // here we need to use lastCell to determine if there is row change and increase
+        // numberOfCompleteRows.
+        numberOfCompleteRows++;
+      }
       // check if we have a row change
       if (!partialResults.isEmpty() &&
           !Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
@@ -122,9 +147,12 @@ public class BatchScanResultCache implements ScanResultCache {
       }
       Result regroupedResult = regroupResults(result);
       if (regroupedResult != null) {
+        if (!regroupedResult.mayHaveMoreCellsInRow()) {
+          numberOfCompleteRows++;
+        }
         regroupedResults.add(regroupedResult);
         // only update last cell when we actually return it to user.
-        updateLastCell(regroupedResult);
+        recordLastResult(regroupedResult);
       }
       if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
         // We are done for this row
@@ -139,4 +167,9 @@ public class BatchScanResultCache implements ScanResultCache {
     partialResults.clear();
     numCellsOfPartialResults = 0;
   }
+
+  @Override
+  public int numberOfCompleteRows() {
+    return numberOfCompleteRows;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index abcb67e..8e94c7c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -18,13 +18,11 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.concurrent.ExecutorService;
 
@@ -465,8 +463,11 @@ public abstract class ClientScanner extends AbstractClientScanner {
       // Groom the array of Results that we received back from the server before adding that
       // Results to the scanner's cache. If partial results are not allowed to be seen by
the
       // caller, all book keeping will be performed within this method.
+      int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows();
       Result[] resultsToAddToCache =
           scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
+      int numberOfCompleteRows =
+          scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
       if (resultsToAddToCache.length > 0) {
         for (Result rs : resultsToAddToCache) {
           cache.add(rs);
@@ -476,12 +477,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
           countdown--;
           this.lastResult = rs;
         }
-        if (scan.getLimit() > 0) {
-          int newLimit =
-              scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache));
-          assert newLimit >= 0;
-          scan.setLimit(newLimit);
-        }
+      }
+
+      if (scan.getLimit() > 0) {
+        int newLimit = scan.getLimit() - numberOfCompleteRows;
+        assert newLimit >= 0;
+        scan.setLimit(newLimit);
       }
       if (scanExhausted(values)) {
         closeScanner();

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
index e09ddfb..a132642 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Private
 class CompleteScanResultCache implements ScanResultCache {
 
+  private int numberOfCompleteRows;
+
   private final List<Result> partialResults = new ArrayList<>();
 
   private Result combine() throws IOException {
@@ -59,6 +61,11 @@ class CompleteScanResultCache implements ScanResultCache {
     return prependResults;
   }
 
+  private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) {
+    numberOfCompleteRows += results.length;
+    return results;
+  }
+
   @Override
   public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException
{
     // If no results were returned it indicates that either we have the all the partial results
@@ -69,7 +76,7 @@ class CompleteScanResultCache implements ScanResultCache {
       // and thus there may be more partials server side that still need to be added to the
partial
       // list before we form the complete Result
       if (!partialResults.isEmpty() && !isHeartbeatMessage) {
-        return new Result[] { combine() };
+        return updateNumberOfCompleteResultsAndReturn(combine());
       }
       return EMPTY_RESULT_ARRAY;
     }
@@ -79,7 +86,7 @@ class CompleteScanResultCache implements ScanResultCache {
     if (last.mayHaveMoreCellsInRow()) {
       if (partialResults.isEmpty()) {
         partialResults.add(last);
-        return Arrays.copyOf(results, results.length - 1);
+        return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length
- 1));
       }
       // We have only one result and it is partial
       if (results.length == 1) {
@@ -90,21 +97,26 @@ class CompleteScanResultCache implements ScanResultCache {
         }
         Result completeResult = combine();
         partialResults.add(last);
-        return new Result[] { completeResult };
+        return updateNumberOfCompleteResultsAndReturn(completeResult);
       }
       // We have some complete results
       Result[] resultsToReturn = prependCombined(results, results.length - 1);
       partialResults.add(last);
-      return resultsToReturn;
+      return updateNumberOfCompleteResultsAndReturn(resultsToReturn);
     }
     if (!partialResults.isEmpty()) {
-      return prependCombined(results, results.length);
+      return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length));
     }
-    return results;
+    return updateNumberOfCompleteResultsAndReturn(results);
   }
 
   @Override
   public void clear() {
     partialResults.clear();
   }
+
+  @Override
+  public int numberOfCompleteRows() {
+    return numberOfCompleteRows;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 1bdc5fe..7155659 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -303,29 +302,6 @@ public class ConnectionUtils {
     return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0;
   }
 
-  /**
-   * Count the individual rows for the given result list.
-   * <p>
-   * There are two reason why we need to use this method instead of a simple {@code results.length}.
-   * <ol>
-   * <li>Server may return only part of the whole cells of a row for the last result,
and if
-   * allowPartial is true, we will return the array to user directly. We should not count
the last
-   * result.</li>
-   * <li>If this is a batched scan, a row may be split into several results, but they
should be
-   * counted as one row. For example, a row with 15 cells will be split into 3 results with
5 cells
-   * each if {@code scan.getBatch()} is 5.</li>
-   * </ol>
-   */
-  public static int numberOfIndividualRows(List<Result> results) {
-    int count = 0;
-    for (Result result : results) {
-      if (!result.mayHaveMoreCellsInRow()) {
-        count++;
-      }
-    }
-    return count;
-  }
-
   public static ScanResultCache createScanResultCache(Scan scan) {
     if (scan.getAllowPartialResults()) {
       return new AllowPartialScanResultCache();

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 9ce40e8..26076f5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -1149,8 +1149,6 @@ public class Scan extends Query {
    * reaches this value.
    * <p>
    * This condition will be tested at last, after all other conditions such as stopRow, filter,
etc.
-   * <p>
-   * Can not be used together with batch and allowPartial.
    * @param limit the limit of rows for this scan
    * @return this
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
index 2366b57..2d28e1a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
  * <ol>
  * <li>Get results from ScanResponse proto.</li>
  * <li>Pass them to ScanResultCache and get something back.</li>
- * <li>If we actually get something back, then pass it to ScanObserver.</li>
+ * <li>If we actually get something back, then pass it to ScanConsumer.</li>
  * </ol>
  */
 @InterfaceAudience.Private
@@ -50,4 +50,9 @@ interface ScanResultCache {
    * again.
    */
   void clear();
+
+  /**
+   * Return the number of complete rows. Used to implement limited scan.
+   */
+  int numberOfCompleteRows();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 2ba1ce9..e7e65a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6076,7 +6076,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       // If the size limit was reached it means a partial Result is being returned. Returning
a
       // partial Result means that we should not reset the filters; filters should only be
reset in
       // between rows
-      if (!scannerContext.hasMoreCellsInRow()) {
+      if (!scannerContext.mayHaveMoreCellsInRow()) {
         resetFilters();
       }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 49ce348..27f5420 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -274,6 +274,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     private final String scannerName;
     private final RegionScanner s;
     private final Region r;
+    private byte[] rowOfLastPartialResult;
 
     public RegionScannerHolder(String scannerName, RegionScanner s, Region r) {
       this.scannerName = scannerName;
@@ -2558,9 +2559,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return -1L;
   }
 
+  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
+      ScannerContext scannerContext, ScanResponse.Builder builder) {
+    if (numOfCompleteRows >= limitOfRows) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows +
+            " scannerContext: " + scannerContext);
+      }
+      builder.setMoreResults(false);
+    }
+  }
+
   // return whether we have more results in region.
-  private boolean scan(HBaseRpcController controller, ScanRequest request,
-      RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, List<Result>
results,
+  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder
rsh,
+      long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
       ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context)
       throws IOException {
     Region region = rsh.r;
@@ -2577,7 +2589,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     List<Cell> values = new ArrayList<Cell>(32);
     region.startRegionOperation(Operation.SCAN);
     try {
-      int i = 0;
+      int numOfResults = 0;
+      int numOfCompleteRows = 0;
       long before = EnvironmentEdgeManager.currentTime();
       synchronized (scanner) {
         boolean stale = (region.getRegionInfo().getReplicaId() != 0);
@@ -2622,7 +2635,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         contextBuilder.setTrackMetrics(trackMetrics);
         ScannerContext scannerContext = contextBuilder.build();
         boolean limitReached = false;
-        while (i < maxResults) {
+        while (numOfResults < maxResults) {
           // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
           // batch limit is a limit on the number of cells per Result. Thus, if progress
is
           // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
@@ -2634,16 +2647,46 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           moreRows = scanner.nextRaw(values, scannerContext);
 
           if (!values.isEmpty()) {
-            Result r = Result.create(values, null, stale, scannerContext.hasMoreCellsInRow());
+            if (limitOfRows > 0) {
+              // First we need to check if the last result is partial and we have a row change.
If
+              // so then we need to increase the numOfCompleteRows.
+              if (results.isEmpty()) {
+                if (rsh.rowOfLastPartialResult != null &&
+                    !CellUtil.matchingRow(values.get(0), rsh.rowOfLastPartialResult)) {
+                  numOfCompleteRows++;
+                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
+                    builder);
+                }
+              } else {
+                Result lastResult = results.get(results.size() - 1);
+                if (lastResult.mayHaveMoreCellsInRow() &&
+                    !CellUtil.matchingRow(values.get(0), lastResult.getRow())) {
+                  numOfCompleteRows++;
+                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
+                    builder);
+                }
+              }
+              if (builder.hasMoreResults() && !builder.getMoreResults()) {
+                break;
+              }
+            }
+            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
+            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
             lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
             results.add(r);
-            i++;
+            numOfResults++;
+            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
+              numOfCompleteRows++;
+              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
builder);
+              if (builder.hasMoreResults() && !builder.getMoreResults()) {
+                break;
+              }
+            }
           }
-
           boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
           boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
-          boolean rowLimitReached = i >= maxResults;
-          limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
+          boolean resultsLimitReached = numOfResults >= maxResults;
+          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
 
           if (limitReached || !moreRows) {
             if (LOG.isTraceEnabled()) {
@@ -2669,7 +2712,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           // We didn't get a single batch
           builder.setMoreResultsInRegion(false);
         }
-
         // Check to see if the client requested that we track metrics server side. If the
         // client requested metrics, retrieve the metrics from the scanner context.
         if (trackMetrics) {
@@ -2686,7 +2728,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           builder.setScanMetrics(metricBuilder.build());
         }
       }
-      region.updateReadRequestsCount(i);
+      region.updateReadRequestsCount(numOfResults);
       long end = EnvironmentEdgeManager.currentTime();
       long responseCellSize = context != null ? context.getResponseCellSize() : 0;
       region.getMetrics().updateScanTime(end - before);
@@ -2701,7 +2743,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     if (region.getCoprocessorHost() != null) {
       region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
     }
-    return builder.getMoreResultsInRegion();
   }
 
   /**
@@ -2809,14 +2850,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     // now let's do the real scan.
     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
     RegionScanner scanner = rsh.s;
-    boolean moreResults = true;
-    boolean moreResultsInRegion = true;
     // this is the limit of rows for this scan, if we the number of rows reach this value,
we will
     // close the scanner.
     int limitOfRows;
     if (request.hasLimitOfRows()) {
       limitOfRows = request.getLimitOfRows();
-      rows = Math.min(rows, limitOfRows);
     } else {
       limitOfRows = -1;
     }
@@ -2839,32 +2877,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
         }
         if (!done) {
-          moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh,
-            maxQuotaResultSize, rows, results, builder, lastBlock, context);
+          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
+            results, builder, lastBlock, context);
         }
       }
 
       quota.addScanResult(results);
-
+      addResults(builder, results, (HBaseRpcController) controller,
+        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
       if (scanner.isFilterDone() && results.isEmpty()) {
         // If the scanner's filter - if any - is done with the scan
         // only set moreResults to false if the results is empty. This is used to keep compatible
         // with the old scan implementation where we just ignore the returned results if
moreResults
         // is false. Can remove the isEmpty check after we get rid of the old implementation.
-        moreResults = false;
-      } else if (limitOfRows > 0 && !results.isEmpty() &&
-          !results.get(results.size() - 1).mayHaveMoreCellsInRow() &&
-          ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) {
-        // if we have reached the limit of rows
-        moreResults = false;
+        builder.setMoreResults(false);
+      }
+      // we only set moreResults to false in the above code, so set it to true if we haven't
set it
+      // yet.
+      if (!builder.hasMoreResults()) {
+        builder.setMoreResults(true);
+      }
+      if (builder.getMoreResults() && builder.getMoreResultsInRegion() &&
!results.isEmpty()) {
+        // Record the last cell of the last result if it is a partial result
+        // We need this to calculate the complete rows we have returned to client as the
+        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for
the
+        // current row. We may filter out all the remaining cells for the current row and
just
+        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we
need to
+        // check for row change.
+        Result lastResult = results.get(results.size() - 1);
+        if (lastResult.mayHaveMoreCellsInRow()) {
+          rsh.rowOfLastPartialResult = lastResult.getRow();
+        } else {
+          rsh.rowOfLastPartialResult = null;
+        }
       }
-      addResults(builder, results, (HBaseRpcController) controller,
-        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
-      if (!moreResults || !moreResultsInRegion || closeScanner) {
+      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner)
{
         scannerClosed = true;
         closeScanner(region, scanner, scannerName, context);
       }
-      builder.setMoreResults(moreResults);
       return builder.build();
     } catch (Exception e) {
       try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 67b2693..2c5fd01 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -221,7 +221,7 @@ public class ScannerContext {
    * @return true when we have more cells for the current row. This usually because we have
reached
    *         a limit in the middle of a row
    */
-  boolean hasMoreCellsInRow() {
+  boolean mayHaveMoreCellsInRow() {
     return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW ||
         scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW ||
         scannerState == NextState.BATCH_LIMIT_REACHED;

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ColumnCountOnRowFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ColumnCountOnRowFilter.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ColumnCountOnRowFilter.java
new file mode 100644
index 0000000..c4b4d28
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ColumnCountOnRowFilter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+@InterfaceAudience.Private
+public final class ColumnCountOnRowFilter extends FilterBase {
+
+  private final int limit;
+
+  private int count = 0;
+
+  public ColumnCountOnRowFilter(int limit) {
+    this.limit = limit;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(Cell v) throws IOException {
+    count++;
+    return count > limit ? ReturnCode.NEXT_ROW : ReturnCode.INCLUDE;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    this.count = 0;
+  }
+
+  @Override
+  public byte[] toByteArray() throws IOException {
+    return Bytes.toBytes(limit);
+  }
+
+  public static ColumnCountOnRowFilter parseFrom(byte[] bytes) throws DeserializationException
{
+    return new ColumnCountOnRowFilter(Bytes.toInt(bytes));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/849ab5ff/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLimitedScanWithFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLimitedScanWithFilter.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLimitedScanWithFilter.java
new file mode 100644
index 0000000..f702e3d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLimitedScanWithFilter.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * With filter we may stop at a middle of row and think that we still have more cells for
the
+ * current row but actually all the remaining cells will be filtered out by the filter. So
it will
+ * lead to a Result that mayHaveMoreCellsInRow is true but actually there are no cells for
the same
+ * row. Here we want to test if our limited scan still works.
+ */
+@Category({ MediumTests.class, ClientTests.class })
+public class TestLimitedScanWithFilter {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME = TableName.valueOf("TestRegionScanner");
+
+  private static final byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static final byte[][] CQS =
+      { Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), Bytes.toBytes("cq3"), Bytes.toBytes("cq4")
};
+
+  private static int ROW_COUNT = 10;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+    try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
+      for (int i = 0; i < ROW_COUNT; i++) {
+        Put put = new Put(Bytes.toBytes(i));
+        for (int j = 0; j < CQS.length; j++) {
+          put.addColumn(FAMILY, CQS[j], Bytes.toBytes((j + 1) * i));
+        }
+        table.put(put);
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCompleteResult() throws IOException {
+    int limit = 5;
+    Scan scan =
+        new Scan().setFilter(new ColumnCountOnRowFilter(2)).setMaxResultSize(1).setLimit(limit);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
+        ResultScanner scanner = table.getScanner(scan)) {
+      for (int i = 0; i < limit; i++) {
+        Result result = scanner.next();
+        assertEquals(i, Bytes.toInt(result.getRow()));
+        assertEquals(2, result.size());
+        assertFalse(result.mayHaveMoreCellsInRow());
+        assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0])));
+        assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1])));
+      }
+      assertNull(scanner.next());
+    }
+  }
+
+  @Test
+  public void testAllowPartial() throws IOException {
+    int limit = 5;
+    Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(2)).setMaxResultSize(1)
+        .setAllowPartialResults(true).setLimit(limit);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
+        ResultScanner scanner = table.getScanner(scan)) {
+      for (int i = 0; i < 2 * limit; i++) {
+        int key = i / 2;
+        Result result = scanner.next();
+        assertEquals(key, Bytes.toInt(result.getRow()));
+        assertEquals(1, result.size());
+        assertTrue(result.mayHaveMoreCellsInRow());
+        int cqIndex = i % 2;
+        assertEquals(key * (cqIndex + 1), Bytes.toInt(result.getValue(FAMILY, CQS[cqIndex])));
+      }
+      assertNull(scanner.next());
+    }
+  }
+
+  @Test
+  public void testBatchAllowPartial() throws IOException {
+    int limit = 5;
+    Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(3)).setBatch(2).setMaxResultSize(1)
+        .setAllowPartialResults(true).setLimit(limit);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
+        ResultScanner scanner = table.getScanner(scan)) {
+      for (int i = 0; i < 3 * limit; i++) {
+        int key = i / 3;
+        Result result = scanner.next();
+        assertEquals(key, Bytes.toInt(result.getRow()));
+        assertEquals(1, result.size());
+        assertTrue(result.mayHaveMoreCellsInRow());
+        int cqIndex = i % 3;
+        assertEquals(key * (cqIndex + 1), Bytes.toInt(result.getValue(FAMILY, CQS[cqIndex])));
+      }
+      assertNull(scanner.next());
+    }
+  }
+
+  @Test
+  public void testBatch() throws IOException {
+    int limit = 5;
+    Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(2)).setBatch(2).setMaxResultSize(1)
+        .setLimit(limit);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
+        ResultScanner scanner = table.getScanner(scan)) {
+      for (int i = 0; i < limit; i++) {
+        Result result = scanner.next();
+        assertEquals(i, Bytes.toInt(result.getRow()));
+        assertEquals(2, result.size());
+        assertTrue(result.mayHaveMoreCellsInRow());
+        assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0])));
+        assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1])));
+      }
+      assertNull(scanner.next());
+    }
+  }
+
+  @Test
+  public void testBatchAndFilterDiffer() throws IOException {
+    int limit = 5;
+    Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(3)).setBatch(2).setMaxResultSize(1)
+        .setLimit(limit);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
+        ResultScanner scanner = table.getScanner(scan)) {
+      for (int i = 0; i < limit; i++) {
+        Result result = scanner.next();
+        assertEquals(i, Bytes.toInt(result.getRow()));
+        assertEquals(2, result.size());
+        assertTrue(result.mayHaveMoreCellsInRow());
+        assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0])));
+        assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1])));
+        result = scanner.next();
+        assertEquals(i, Bytes.toInt(result.getRow()));
+        assertEquals(1, result.size());
+        assertFalse(result.mayHaveMoreCellsInRow());
+        assertEquals(3 * i, Bytes.toInt(result.getValue(FAMILY, CQS[2])));
+      }
+      assertNull(scanner.next());
+    }
+  }
+}


Mime
View raw message