hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [3/5] hbase git commit: HBASE-11544: [Ergonomics] hbase.client.scanner.caching is dogged and will try to return batch even if it means OOME
Date Thu, 05 Mar 2015 01:35:03 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
index 41708c0..e68dc75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
@@ -22,8 +22,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
  * Internal scanners differ from client-side scanners in that they operate on
@@ -42,22 +42,213 @@ import org.apache.hadoop.hbase.Cell;
 @InterfaceAudience.Private
 public interface InternalScanner extends Closeable {
   /**
+   * This class encapsulates all the meaningful state information that we would like the know about
+   * after a call to {@link InternalScanner#next(List)}. While this is not an enum, a restriction on
+   * the possible states is implied through the exposed {@link #makeState(State)} method.
+   */
+  public static class NextState {
+    /**
+     * The possible states we want to restrict ourselves to. This enum is not sufficient to
+     * encapsulate all of the state information since some of the fields of the state must be
+     * dynamic (e.g. resultSize).
+     */
+    public enum State {
+      MORE_VALUES(true),
+      NO_MORE_VALUES(false),
+      SIZE_LIMIT_REACHED(true),
+      BATCH_LIMIT_REACHED(true);
+
+      private boolean moreValues;
+
+      private State(final boolean moreValues) {
+        this.moreValues = moreValues;
+      }
+
+      /**
+       * @return true when the state indicates that more values may follow those that have been
+       *         returned
+       */
+      public boolean hasMoreValues() {
+        return this.moreValues;
+      }
+    }
+
+    /**
+     * state variables
+     */
+    private final State state;
+    private long resultSize;
+
+    /**
+     * Value to use for resultSize when the size has not been calculated. Must be a negative number
+     * so that {@link NextState#hasResultSizeEstimate()} returns false.
+     */
+    private static final long DEFAULT_RESULT_SIZE = -1;
+
+    private NextState(State state, long resultSize) {
+      this.state = state;
+      this.resultSize = resultSize;
+    }
+
+    /**
+     * @param state
+     * @return An instance of {@link NextState} where the size of the results returned from the call
+     *         to {@link InternalScanner#next(List)} is unknown. It it the responsibility of the
+     *         caller of {@link InternalScanner#next(List)} to calculate the result size if needed
+     */
+    public static NextState makeState(final State state) {
+      return makeState(state, DEFAULT_RESULT_SIZE);
+    }
+
+    /**
+     * @param state
+     * @param resultSize
+     * @return An instance of {@link NextState} where the size of the values returned from the call
+     *         to {@link InternalScanner#next(List)} is known. The caller can avoid recalculating
+     *         the result size by using the cached value retrievable via {@link #getResultSize()}
+     */
+    public static NextState makeState(final State state, long resultSize) {
+      switch (state) {
+      case MORE_VALUES:
+        return createMoreValuesState(resultSize);
+      case NO_MORE_VALUES:
+        return createNoMoreValuesState(resultSize);
+      case BATCH_LIMIT_REACHED:
+        return createBatchLimitReachedState(resultSize);
+      case SIZE_LIMIT_REACHED:
+        return createSizeLimitReachedState(resultSize);
+      default:
+        // If the state is not recognized, default to no more value state
+        return createNoMoreValuesState(resultSize);
+      }
+    }
+
+    /**
+     * Convenience method for creating a state that indicates that more values can be scanned
+     * @param resultSize estimate of the size (heap size) of the values returned from the call to
+     *          {@link InternalScanner#next(List)}
+     */
+    private static NextState createMoreValuesState(long resultSize) {
+      return new NextState(State.MORE_VALUES, resultSize);
+    }
+
+    /**
+     * Convenience method for creating a state that indicates that no more values can be scanned.
+     * @param resultSize estimate of the size (heap size) of the values returned from the call to
+     *          {@link InternalScanner#next(List)}
+     */
+    private static NextState createNoMoreValuesState(long resultSize) {
+      return new NextState(State.NO_MORE_VALUES, resultSize);
+    }
+
+    /**
+     * Convenience method for creating a state that indicates that the scan stopped because the
+     * batch limit was exceeded
+     * @param resultSize estimate of the size (heap size) of the values returned from the call to
+     *          {@link InternalScanner#next(List)}
+     */
+    private static NextState createBatchLimitReachedState(long resultSize) {
+      return new NextState(State.BATCH_LIMIT_REACHED, resultSize);
+    }
+
+    /**
+     * Convenience method for creating a state that indicates that the scan stopped due to the size
+     * limit
+     * @param resultSize estimate of the size (heap size) of the values returned from the call to
+     *          {@link InternalScanner#next(List)}
+     */
+    private static NextState createSizeLimitReachedState(long resultSize) {
+      return new NextState(State.SIZE_LIMIT_REACHED, resultSize);
+    }
+
+    /**
+     * @return true when the scanner has more values to be scanned following the values returned by
+     *         the call to {@link InternalScanner#next(List)}
+     */
+    public boolean hasMoreValues() {
+      return this.state.hasMoreValues();
+    }
+
+    /**
+     * @return true when the scanner had to stop scanning because it reached the batch limit
+     */
+    public boolean batchLimitReached() {
+      return this.state == State.BATCH_LIMIT_REACHED;
+    }
+
+    /**
+     * @return true when the scanner had to stop scanning because it reached the size limit
+     */
+    public boolean sizeLimitReached() {
+      return this.state == State.SIZE_LIMIT_REACHED;
+    }
+
+    /**
+     * @return The size (heap size) of the values that were returned from the call to
+     *         {@link InternalScanner#next(List)}. This value should only be used if
+     *         {@link #hasResultSizeEstimate()} returns true.
+     */
+    public long getResultSize() {
+      return resultSize;
+    }
+
+    /**
+     * @return true when an estimate for the size of the values returned by
+     *         {@link InternalScanner#next(List)} was provided. If false, it is the responsibility
+     *         of the caller to calculate the result size
+     */
+    public boolean hasResultSizeEstimate() {
+      return resultSize >= 0;
+    }
+
+    /**
+     * Helper method to centralize all checks as to whether or not the state is valid.
+     * @param state
+     * @return true when the state is valid
+     */
+    public static boolean isValidState(NextState state) {
+      return state != null;
+    }
+
+    /**
+     * @param state
+     * @return true when the state is non null and indicates that more values exist
+     */
+    public static boolean hasMoreValues(NextState state) {
+      return state != null && state.hasMoreValues();
+    }
+  }
+
+  /**
    * Grab the next row's worth of values.
    * @param results return output array
-   * @return true if more rows exist after this one, false if scanner is done
+   * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
+   *         one, false if scanner is done
+   * @throws IOException e
+   */
+  NextState next(List<Cell> results) throws IOException;
+
+  /**
+   * Grab the next row's worth of values with a limit on the number of values to return.
+   * @param result return output array
+   * @param limit limit on row count to get
+   * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
+   *         one, false if scanner is done
    * @throws IOException e
    */
-  boolean next(List<Cell> results) throws IOException;
+  NextState next(List<Cell> result, int limit) throws IOException;
 
   /**
-   * Grab the next row's worth of values with a limit on the number of values
-   * to return.
+   * Grab the next row's worth of values with a limit on the number of values to return as well as a
+   * restriction on the size of the list of values that are returned.
    * @param result return output array
    * @param limit limit on row count to get
-   * @return true if more rows exist after this one, false if scanner is done
+   * @param remainingResultSize limit on the size of the result being returned
+   * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
+   *         one, false if scanner is done
    * @throws IOException e
    */
-  boolean next(List<Cell> result, int limit) throws IOException;
+  NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException;
 
   /**
    * Closes the scanner and releases any resources it has allocated

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index afc4e4e..beb23cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -24,9 +24,9 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
  * Implements a heap merge across any number of KeyValueScanners.
@@ -125,18 +125,29 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
    * <p>
    * This method takes care of updating the heap.
    * <p>
-   * This can ONLY be called when you are using Scanners that implement
-   * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
+   * KeyValueScanner (a {@link StoreScanner}).
    * @param result
    * @param limit
-   * @return true if there are more keys, false if all scanners are done
+   * @return state where NextState#hasMoreValues() is true if more keys exist after this
+   *         one, false if scanner is done
    */
-  public boolean next(List<Cell> result, int limit) throws IOException {
+  public NextState next(List<Cell> result, int limit) throws IOException {
+    return next(result, limit, -1);
+  }
+
+  public NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException {
     if (this.current == null) {
-      return false;
+      return NextState.makeState(NextState.State.NO_MORE_VALUES);
     }
     InternalScanner currentAsInternal = (InternalScanner)this.current;
-    boolean mayContainMoreRows = currentAsInternal.next(result, limit);
+    NextState state = currentAsInternal.next(result, limit, remainingResultSize);
+    // Invalid states should never be returned. Receiving an invalid state means that we have
+    // no clue how to proceed. Throw an exception.
+    if (!NextState.isValidState(state)) {
+      throw new IOException("Invalid state returned from InternalScanner#next");
+    }
+    boolean mayContainMoreRows = NextState.hasMoreValues(state);
     Cell pee = this.current.peek();
     /*
      * By definition, any InternalScanner must return false only when it has no
@@ -151,7 +162,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
       this.heap.add(this.current);
     }
     this.current = pollRealKV();
-    return (this.current != null);
+    if (this.current == null) {
+      state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize());
+    }
+    return state;
   }
 
   /**
@@ -159,12 +173,13 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
    * <p>
    * This method takes care of updating the heap.
    * <p>
-   * This can ONLY be called when you are using Scanners that implement
-   * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
+   * KeyValueScanner (a {@link StoreScanner}).
    * @param result
-   * @return true if there are more keys, false if all scanners are done
+   * @return state where NextState#hasMoreValues() is true if more keys exist after this
+   *         one, false if scanner is done
    */
-  public boolean next(List<Cell> result) throws IOException {
+  public NextState next(List<Cell> result) throws IOException {
     return next(result, -1);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index dff79f7..3ce1b57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
 import org.apache.hadoop.hbase.quotas.OperationQuota;
 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
@@ -339,6 +340,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     if (isClientCellBlockSupport()) {
       for (Result res : results) {
         builder.addCellsPerResult(res.size());
+        builder.addPartialFlagPerResult(res.isPartial());
       }
       ((PayloadCarryingRpcController)controller).
         setCellScanner(CellUtil.createCellScanner(results));
@@ -2040,6 +2042,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       RegionScannerHolder rsh = null;
       boolean moreResults = true;
       boolean closeScanner = false;
+      boolean isSmallScan = false;
       ScanResponse.Builder builder = ScanResponse.newBuilder();
       if (request.hasCloseScanner()) {
         closeScanner = request.getCloseScanner();
@@ -2071,6 +2074,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         if (!isLoadingCfsOnDemandSet) {
           scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
         }
+
+        isSmallScan = scan.isSmall();
         region.prepareScanner(scan);
         if (region.getCoprocessorHost() != null) {
           scanner = region.getCoprocessorHost().preScannerOpen(scan);
@@ -2111,9 +2116,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           // Remove lease while its being processed in server; protects against case
           // where processing of request takes > lease expiration time.
           lease = regionServer.leases.removeLease(scannerName);
-          List<Result> results = new ArrayList<Result>(rows);
-          long currentScanResultSize = 0;
+          List<Result> results = new ArrayList<Result>();
           long totalCellSize = 0;
+          long currentScanResultSize = 0;
 
           boolean done = false;
           // Call coprocessor. Get region info from scanner.
@@ -2123,8 +2128,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             if (!results.isEmpty()) {
               for (Result r : results) {
                 for (Cell cell : r.rawCells()) {
-                  currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell);
                   totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
+                  currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell);
                 }
               }
             }
@@ -2144,23 +2149,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               int i = 0;
               synchronized(scanner) {
                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
+                boolean clientHandlesPartials =
+                    request.hasClientHandlesPartials() && request.getClientHandlesPartials();
+
+                // On the server side we must ensure that the correct ordering of partial results is
+                // returned to the client to allow them to properly reconstruct the partial results.
+                // If the coprocessor host is adding to the result list, we cannot guarantee the
+                // correct ordering of partial results and so we prevent partial results from being
+                // formed.
+                boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
+                boolean enforceMaxResultSizeAtCellLevel =
+                    clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
+
                 while (i < rows) {
-                  // Stop collecting results if maxScannerResultSize is set and we have exceeded it
-                  if ((maxScannerResultSize < Long.MAX_VALUE) &&
-                      (currentScanResultSize >= maxResultSize)) {
+                  // Stop collecting results if we have exceeded maxResultSize
+                  if (currentScanResultSize >= maxResultSize) {
                     break;
                   }
+
+                  // A negative remainingResultSize communicates that there is no limit on the size
+                  // of the results.
+                  final long remainingResultSize =
+                      enforceMaxResultSizeAtCellLevel ? maxResultSize - currentScanResultSize
+                          : -1;
+
                   // Collect values to be returned here
-                  boolean moreRows = scanner.nextRaw(values);
+                  NextState state =
+                      scanner.nextRaw(values, scanner.getBatch(), remainingResultSize);
+                  // Invalid states should never be returned. If one is seen, throw exception
+                  // to stop the scan -- We have no way of telling how we should proceed
+                  if (!NextState.isValidState(state)) {
+                    throw new IOException("NextState returned from call to nextRaw was invalid");
+                  }
                   if (!values.isEmpty()) {
+                    // The state should always contain an estimate of the result size because that
+                    // estimate must be used to decide when partial results are formed.
+                    boolean skipResultSizeCalculation = state.hasResultSizeEstimate();
+                    if (skipResultSizeCalculation) currentScanResultSize += state.getResultSize();
+
                     for (Cell cell : values) {
-                      currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell);
                       totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
+
+                      // If the calculation can't be skipped, then do it now.
+                      if (!skipResultSizeCalculation) {
+                        currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell);
+                      }
                     }
-                    results.add(Result.create(values, null, stale));
+                    // The size limit was reached. This means there are more cells remaining in
+                    // the row but we had to stop because we exceeded our max result size. This
+                    // indicates that we are returning a partial result
+                    final boolean partial = state != null && state.sizeLimitReached();
+                    results.add(Result.create(values, null, stale, partial));
                     i++;
                   }
-                  if (!moreRows) {
+                  if (!NextState.hasMoreValues(state)) {
                     break;
                   }
                   values.clear();

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
index ec68dc7..26f9aef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
  * RegionScanner describes iterators over rows in an HRegion.
@@ -68,25 +68,43 @@ public interface RegionScanner extends InternalScanner {
   long getMvccReadPoint();
 
   /**
-   * Grab the next row's worth of values with the default limit on the number of values
-   * to return.
+   * @return The limit on the number of cells to retrieve on each call to next(). See
+   *         {@link org.apache.hadoop.hbase.client.Scan#setBatch(int)}
+   */
+  int getBatch();
+
+  /**
+   * Grab the next row's worth of values with the default limit on the number of values to return.
    * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
-   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
-   * Caller should maintain and update metrics.
-   * See {@link #nextRaw(List, int)}
+   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on
+   * the scanner object. Caller should maintain and update metrics. See
+   * {@link #nextRaw(List, int, long)}
    * @param result return output array
-   * @return true if more rows exist after this one, false if scanner is done
+   * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
+   *         scanner is done.
    * @throws IOException e
    */
-  boolean nextRaw(List<Cell> result) throws IOException;
+  NextState nextRaw(List<Cell> result) throws IOException;
 
   /**
-   * Grab the next row's worth of values with a limit on the number of values
-   * to return.
+   * Grab the next row's worth of values with the default limit on the number of values to return.
    * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
-   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
-   * Example:
-   * <code><pre>
+   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on
+   * the scanner object. Caller should maintain and update metrics. See
+   * {@link #nextRaw(List, int, long)}
+   * @param result return output array
+   * @param limit limit on row count to get
+   * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
+   *         scanner is done.
+   * @throws IOException e
+   */
+  NextState nextRaw(List<Cell> result, int limit) throws IOException;
+
+  /**
+   * Grab the next row's worth of values with a limit on the number of values to return as well as a
+   * limit on the heap size of those values. This is a special internal method to be called from
+   * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and
+   * close a region operation, an synchronize on the scanner object. Example: <code><pre>
    * HRegion region = ...;
    * RegionScanner scanner = ...
    * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
@@ -103,8 +121,12 @@ public interface RegionScanner extends InternalScanner {
    * </pre></code>
    * @param result return output array
    * @param limit limit on row count to get
-   * @return true if more rows exist after this one, false if scanner is done
+   * @param remainingResultSize the space remaining within the restriction on the result size.
+   *          Negative values indicate no limit
+   * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
+   *         scanner is done.
    * @throws IOException e
    */
-  boolean nextRaw(List<Cell> result, int limit) throws IOException;
+  NextState nextRaw(List<Cell> result, int limit, final long remainingResultSize)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index ca34164..831673d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -23,13 +23,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 
 /**
@@ -112,7 +113,7 @@ abstract class StoreFlusher {
     List<Cell> kvs = new ArrayList<Cell>();
     boolean hasMore;
     do {
-      hasMore = scanner.next(kvs, compactionKVMax);
+      hasMore = NextState.hasMoreValues(scanner.next(kvs, compactionKVMax));
       if (!kvs.isEmpty()) {
         for (Cell c : kvs) {
           // If we know that this KV is going to be included always, then let us

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 47e5360..16d3fc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -449,24 +449,38 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    * @return true if there are more rows, false if scanner is done
    */
   @Override
-  public boolean next(List<Cell> outResult, int limit) throws IOException {
+  public NextState next(List<Cell> outResult, int limit) throws IOException {
+    // -1 means no limit
+    return next(outResult, limit, -1);
+  }
+
+  /**
+   * Get the next row of values from this Store.
+   * @param outResult
+   * @param limit
+   * @param remainingResultSize
+   * @return true if there are more rows, false if scanner is done
+   */
+  @Override
+  public NextState next(List<Cell> outResult, int limit, long remainingResultSize)
+      throws IOException {
     lock.lock();
     try {
     if (checkReseek()) {
-      return true;
+      return NextState.makeState(NextState.State.MORE_VALUES, 0);
     }
 
     // if the heap was left null, then the scanners had previously run out anyways, close and
     // return.
     if (this.heap == null) {
       close();
-      return false;
+      return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
     }
 
     Cell peeked = this.heap.peek();
     if (peeked == null) {
       close();
-      return false;
+      return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
     }
 
     // only call setRow if the row changes; avoids confusing the query matcher
@@ -474,10 +488,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     byte[] row = peeked.getRowArray();
     int offset = peeked.getRowOffset();
     short length = peeked.getRowLength();
-    if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
-        matcher.rowOffset, matcher.rowLength)) {
-      this.countPerRow = 0;
-      matcher.setRow(row, offset, length);
+
+    // If limit < 0 and remainingResultSize < 0 we can skip the row comparison because we know
+    // the row has changed. Else it is possible we are still traversing the same row so we
+    // must perform the row comparison.
+      if ((limit < 0 && remainingResultSize < 0) || matcher.row == null
+          || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset,
+            matcher.rowLength)) {
+        this.countPerRow = 0;
+        matcher.setRow(row, offset, length);
     }
 
     Cell cell;
@@ -488,6 +507,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
     int count = 0;
     long totalBytesRead = 0;
+      long totalHeapSize = 0;
 
     LOOP: while((cell = this.heap.peek()) != null) {
       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
@@ -512,7 +532,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
               this.countPerRow > (storeLimit + storeOffset)) {
             // do what SEEK_NEXT_ROW does.
             if (!matcher.moreRowsMayExistAfter(cell)) {
-              return false;
+              return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
             }
             seekToNextRow(cell);
             break LOOP;
@@ -524,6 +544,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
             outResult.add(cell);
             count++;
             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
+            totalHeapSize += CellUtil.estimatedHeapSizeOf(cell);
             if (totalBytesRead > maxRowSize) {
               throw new RowTooBigException("Max row size allowed: " + maxRowSize
               + ", but the row is bigger than that.");
@@ -532,7 +553,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
             if (!matcher.moreRowsMayExistAfter(cell)) {
-              return false;
+              return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
             }
             seekToNextRow(cell);
           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
@@ -544,20 +565,23 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
           if (limit > 0 && (count == limit)) {
             break LOOP;
           }
+          if (remainingResultSize > 0 && (totalHeapSize >= remainingResultSize)) {
+            break LOOP;
+          }
           continue;
 
         case DONE:
-          return true;
+          return NextState.makeState(NextState.State.MORE_VALUES, totalHeapSize);
 
         case DONE_SCAN:
           close();
-          return false;
+          return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
 
         case SEEK_NEXT_ROW:
           // This is just a relatively simple end of scan fix, to short-cut end
           // us if there is an endKey in the scan.
           if (!matcher.moreRowsMayExistAfter(cell)) {
-            return false;
+            return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
           }
 
           seekToNextRow(cell);
@@ -587,12 +611,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
 
     if (count > 0) {
-      return true;
+      return NextState.makeState(NextState.State.MORE_VALUES, totalHeapSize);
     }
 
     // No more keys
     close();
-    return false;
+    return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
     } finally {
       lock.unlock();
     }
@@ -631,7 +655,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   }
 
   @Override
-  public boolean next(List<Cell> outResult) throws IOException {
+  public NextState next(List<Cell> outResult) throws IOException {
     return next(outResult, -1);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index a92c17e..3c3ea6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -248,7 +249,7 @@ public abstract class Compactor {
     throughputController.start(compactionName);
     try {
       do {
-        hasMore = scanner.next(cells, compactionKVMax);
+        hasMore = NextState.hasMoreValues(scanner.next(cells, compactionKVMax));
         if (LOG.isDebugEnabled()) {
           now = EnvironmentEdgeManager.currentTime();
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
index a2648e9..b918354 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -369,7 +370,7 @@ public class AccessControlLists {
       while (true) {
         List<Cell> row = new ArrayList<Cell>();
 
-        boolean hasNext = iScanner.next(row);
+        boolean hasNext = NextState.hasMoreValues(iScanner.next(row));
         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
         byte[] entry = null;
         for (Cell kv : row) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index ca1fba8..3029413 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBu
 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
@@ -739,7 +740,7 @@ public class AccessController extends BaseMasterAndRegionObserver
       do {
         cells.clear();
         // scan with limit as 1 to hold down memory use on wide rows
-        more = scanner.next(cells, 1);
+        more = NextState.hasMoreValues(scanner.next(cells, 1));
         for (Cell cell: cells) {
           if (LOG.isTraceEnabled()) {
             LOG.trace("Found cell " + cell);

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
index e4dc09e..8ea6178 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -563,7 +564,7 @@ public abstract class HBaseTestCase extends TestCase {
     @Override
     public boolean next(List<Cell> results)
     throws IOException {
-      return scanner.next(results);
+      return NextState.hasMoreValues(scanner.next(results));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
index 5659f6b..5c0284f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
new file mode 100644
index 0000000..75fa6e3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -0,0 +1,787 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.ClientScanner;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.RandomRowFilter;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * These tests are focused on testing how partial results appear to a client. Partial results are
+ * {@link Result}s that contain only a portion of a row's complete list of cells. Partial results
+ * are formed when the server breaches its maximum result size when trying to service a client's RPC
+ * request. It is the responsibility of the scanner on the client side to recognize when partial
+ * results have been returned and to take action to form the complete results.
+ * <p>
+ * Unless the flag {@link Scan#setAllowPartialResults(boolean)} has been set to true, the caller of
+ * {@link ResultScanner#next()} should never see partial results.
+ */
+@Category(MediumTests.class)
+public class TestPartialResultsFromClientSide {
+  private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class);
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static Table TABLE = null;
+
+  /**
+   * Table configuration
+   */
+  private static TableName TABLE_NAME = TableName.valueOf("testTable");
+
+  private static int NUM_ROWS = 5;
+  private static byte[] ROW = Bytes.toBytes("testRow");
+  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
+
+  // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then
+  // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which
+  // breaks the simple generation of expected kv's
+  private static int NUM_FAMILIES = 10;
+  private static byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
+
+  private static int NUM_QUALIFIERS = 10;
+  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
+
+  private static int VALUE_SIZE = 1024;
+  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
+
+  private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS;
+
+  // Approximation of how large the heap size of cells in our table. Should be accessed through
+  // getCellHeapSize().
+  private static long CELL_HEAP_SIZE = -1;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
+  }
+
+  static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
+      byte[][] qualifiers, byte[] cellValue) throws IOException {
+    Table ht = TEST_UTIL.createTable(name, families);
+    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
+    ht.put(puts);
+
+    return ht;
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Ensure that the expected key values appear in a result returned from a scanner that is
+   * combining partial results into complete results
+   * @throws Exception
+   */
+  @Test
+  public void testExpectedValuesOfPartialResults() throws Exception {
+    testExpectedValuesOfPartialResults(false);
+    testExpectedValuesOfPartialResults(true);
+  }
+
+  public void testExpectedValuesOfPartialResults(boolean reversed) throws Exception {
+    Scan partialScan = new Scan();
+    partialScan.setMaxVersions();
+    // Max result size of 1 ensures that each RPC request will return a single cell. The scanner
+    // will need to reconstruct the results into a complete result before returning to the caller
+    partialScan.setMaxResultSize(1);
+    partialScan.setReversed(reversed);
+    ResultScanner partialScanner = TABLE.getScanner(partialScan);
+
+    final int startRow = reversed ? ROWS.length - 1 : 0;
+    final int endRow = reversed ? -1 : ROWS.length;
+    final int loopDelta = reversed ? -1 : 1;
+    String message;
+
+    for (int row = startRow; row != endRow; row = row + loopDelta) {
+      message = "Ensuring the expected keyValues are present for row " + row;
+      List<Cell> expectedKeyValues = createKeyValuesForRow(ROWS[row], FAMILIES, QUALIFIERS, VALUE);
+      Result result = partialScanner.next();
+      assertFalse(result.isPartial());
+      verifyResult(result, expectedKeyValues, message);
+    }
+
+    partialScanner.close();
+  }
+
+  /**
+   * Ensure that we only see Results marked as partial when the allowPartial flag is set
+   * @throws Exception
+   */
+  @Test
+  public void testAllowPartialResults() throws Exception {
+    Scan scan = new Scan();
+    scan.setAllowPartialResults(true);
+    scan.setMaxResultSize(1);
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result result = scanner.next();
+
+    assertTrue(result != null);
+    assertTrue(result.isPartial());
+    assertTrue(result.rawCells() != null);
+    assertTrue(result.rawCells().length == 1);
+
+    scanner.close();
+
+    scan.setAllowPartialResults(false);
+    scanner = TABLE.getScanner(scan);
+    result = scanner.next();
+
+    assertTrue(result != null);
+    assertTrue(!result.isPartial());
+    assertTrue(result.rawCells() != null);
+    assertTrue(result.rawCells().length == NUM_COLS);
+
+    scanner.close();
+  }
+
+  /**
+   * Ensure that the results returned from a scanner that retrieves all results in a single RPC call
+   * matches the results that are returned from a scanner that must incrementally combine partial
+   * results into complete results. A variety of scan configurations can be tested
+   * @throws Exception
+   */
+  @Test
+  public void testEquivalenceOfScanResults() throws Exception {
+    Scan oneShotScan = new Scan();
+    oneShotScan.setMaxResultSize(Long.MAX_VALUE);
+
+    Scan partialScan = new Scan(oneShotScan);
+    partialScan.setMaxResultSize(1);
+
+    testEquivalenceOfScanResults(TABLE, oneShotScan, partialScan);
+  }
+
+  public void testEquivalenceOfScanResults(Table table, Scan scan1, Scan scan2) throws Exception {
+    ResultScanner scanner1 = table.getScanner(scan1);
+    ResultScanner scanner2 = table.getScanner(scan2);
+
+    Result r1 = null;
+    Result r2 = null;
+    int count = 0;
+
+    while ((r1 = scanner1.next()) != null) {
+      r2 = scanner2.next();
+
+      assertTrue(r2 != null);
+      compareResults(r1, r2, "Comparing result #" + count);
+      count++;
+    }
+
+    assertTrue(scanner2.next() == null);
+
+    scanner1.close();
+    scanner2.close();
+  }
+
+  /**
+   * Order of cells in partial results matches the ordering of cells from complete results
+   * @throws Exception
+   */
+  @Test
+  public void testOrderingOfCellsInPartialResults() throws Exception {
+    Scan scan = new Scan();
+
+    for (int col = 1; col <= NUM_COLS; col++) {
+      scan.setMaxResultSize(getResultSizeForNumberOfCells(col));
+      testOrderingOfCellsInPartialResults(scan);
+
+      // Test again with a reversed scanner
+      scan.setReversed(true);
+      testOrderingOfCellsInPartialResults(scan);
+    }
+  }
+
+  public void testOrderingOfCellsInPartialResults(final Scan basePartialScan) throws Exception {
+    // Scan that retrieves results in pieces (partials). By setting allowPartialResults to be true
+    // the results will NOT be reconstructed and instead the caller will see the partial results
+    // returned by the server
+    Scan partialScan = new Scan(basePartialScan);
+    partialScan.setAllowPartialResults(true);
+    ResultScanner partialScanner = TABLE.getScanner(partialScan);
+
+    // Scan that retrieves all table results in single RPC request
+    Scan oneShotScan = new Scan(basePartialScan);
+    oneShotScan.setMaxResultSize(Long.MAX_VALUE);
+    oneShotScan.setCaching(ROWS.length);
+    ResultScanner oneShotScanner = TABLE.getScanner(oneShotScan);
+
+    Result oneShotResult = oneShotScanner.next();
+    Result partialResult = null;
+    int iterationCount = 0;
+
+    while (oneShotResult != null && oneShotResult.rawCells() != null) {
+      List<Cell> aggregatePartialCells = new ArrayList<Cell>();
+      do {
+        partialResult = partialScanner.next();
+        assertTrue("Partial Result is null. iteration: " + iterationCount, partialResult != null);
+        assertTrue("Partial cells are null. iteration: " + iterationCount,
+          partialResult.rawCells() != null);
+
+        for (Cell c : partialResult.rawCells()) {
+          aggregatePartialCells.add(c);
+        }
+      } while (partialResult.isPartial());
+
+      assertTrue("Number of cells differs. iteration: " + iterationCount,
+        oneShotResult.rawCells().length == aggregatePartialCells.size());
+      final Cell[] oneShotCells = oneShotResult.rawCells();
+      for (int cell = 0; cell < oneShotCells.length; cell++) {
+        Cell oneShotCell = oneShotCells[cell];
+        Cell partialCell = aggregatePartialCells.get(cell);
+
+        assertTrue("One shot cell was null", oneShotCell != null);
+        assertTrue("Partial cell was null", partialCell != null);
+        assertTrue("Cell differs. oneShotCell:" + oneShotCell + " partialCell:" + partialCell,
+          oneShotCell.equals(partialCell));
+      }
+
+      oneShotResult = oneShotScanner.next();
+      iterationCount++;
+    }
+
+    assertTrue(partialScanner.next() == null);
+
+    partialScanner.close();
+    oneShotScanner.close();
+  }
+
+  /**
+   * Setting the max result size allows us to control how many cells we expect to see on each call
+   * to next on the scanner. Test a variety of different sizes for correctness
+   * @throws Exception
+   */
+  @Test
+  public void testExpectedNumberOfCellsPerPartialResult() throws Exception {
+    Scan scan = new Scan();
+    testExpectedNumberOfCellsPerPartialResult(scan);
+
+    scan.setReversed(true);
+    testExpectedNumberOfCellsPerPartialResult(scan);
+  }
+
+  public void testExpectedNumberOfCellsPerPartialResult(Scan baseScan) throws Exception {
+    for (int expectedCells = 1; expectedCells <= NUM_COLS; expectedCells++) {
+      testExpectedNumberOfCellsPerPartialResult(baseScan, expectedCells);
+    }
+  }
+
+  public void testExpectedNumberOfCellsPerPartialResult(Scan baseScan, int expectedNumberOfCells)
+      throws Exception {
+
+    if (LOG.isInfoEnabled()) LOG.info("groupSize:" + expectedNumberOfCells);
+
+    // Use the cellHeapSize to set maxResultSize such that we know how many cells to expect back
+    // from the call. The returned results should NOT exceed expectedNumberOfCells but may be less
+    // than it in cases where expectedNumberOfCells is not an exact multiple of the number of
+    // columns in the table.
+    Scan scan = new Scan(baseScan);
+    scan.setAllowPartialResults(true);
+    scan.setMaxResultSize(getResultSizeForNumberOfCells(expectedNumberOfCells));
+
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result result = null;
+    byte[] prevRow = null;
+    while ((result = scanner.next()) != null) {
+      assertTrue(result.rawCells() != null);
+
+      // Cases when cell count won't equal expectedNumberOfCells:
+      // 1. Returned result is the final result needed to form the complete result for that row
+      // 2. It is the first result we have seen for that row and thus may have been fetched as
+      // the last group of cells that fit inside the maxResultSize
+      assertTrue(
+        "Result's cell count differed from expected number. result: " + result,
+        result.rawCells().length == expectedNumberOfCells || !result.isPartial()
+          || !Bytes.equals(prevRow, result.getRow()));
+      prevRow = result.getRow();
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * @return The approximate heap size of a cell in the test table. All cells should have
+   *         approximately the same heap size, so the value is cached to avoid repeating the
+   *         calculation
+   * @throws Exception
+   */
+  private long getCellHeapSize() throws Exception {
+    if (CELL_HEAP_SIZE == -1) {
+      // Do a partial scan that will return a single result with a single cell
+      Scan scan = new Scan();
+      scan.setMaxResultSize(1);
+      scan.setAllowPartialResults(true);
+      ResultScanner scanner = TABLE.getScanner(scan);
+
+      Result result = scanner.next();
+
+      assertTrue(result != null);
+      assertTrue(result.rawCells() != null);
+      assertTrue(result.rawCells().length == 1);
+
+      CELL_HEAP_SIZE = CellUtil.estimatedHeapSizeOf(result.rawCells()[0]);
+      if (LOG.isInfoEnabled()) LOG.info("Cell heap size: " + CELL_HEAP_SIZE);
+      scanner.close();
+    }
+
+    return CELL_HEAP_SIZE;
+  }
+
+  /**
+   * @param numberOfCells
+   * @return the result size that should be used in {@link Scan#setMaxResultSize(long)} if you want
+   *         the server to return exactly numberOfCells cells
+   * @throws Exception
+   */
+  private long getResultSizeForNumberOfCells(int numberOfCells) throws Exception {
+    return getCellHeapSize() * numberOfCells;
+  }
+
+  /**
+   * Test various combinations of batching and partial results for correctness
+   */
+  @Test
+  public void testPartialResultsAndBatch() throws Exception {
+    for (int batch = 1; batch <= NUM_COLS / 4; batch++) {
+      for (int cellsPerPartial = 1; cellsPerPartial <= NUM_COLS / 4; cellsPerPartial++) {
+        testPartialResultsAndBatch(batch, cellsPerPartial);
+      }
+    }
+  }
+
+  public void testPartialResultsAndBatch(final int batch, final int cellsPerPartialResult)
+      throws Exception {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("batch: " + batch + " cellsPerPartialResult: " + cellsPerPartialResult);
+    }
+
+    Scan scan = new Scan();
+    scan.setMaxResultSize(getResultSizeForNumberOfCells(cellsPerPartialResult));
+    scan.setBatch(batch);
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result result = scanner.next();
+
+    while ((result = scanner.next()) != null) {
+      assertTrue(result.rawCells() != null);
+
+      if (result.isPartial()) {
+        final String error =
+            "Cells:" + result.rawCells().length + " Batch size:" + batch
+                + " cellsPerPartialResult:" + cellsPerPartialResult;
+        assertTrue(error, result.rawCells().length <= Math.min(batch, cellsPerPartialResult));
+      } else {
+        assertTrue(result.rawCells().length <= batch);
+      }
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * Test the method {@link Result#createCompleteResult(List, Result)}
+   * @throws Exception
+   */
+  @Test
+  public void testPartialResultsReassembly() throws Exception {
+    Scan scan = new Scan();
+    testPartialResultsReassembly(scan);
+    scan.setReversed(true);
+    testPartialResultsReassembly(scan);
+  }
+
+  public void testPartialResultsReassembly(Scan scanBase) throws Exception {
+    Scan partialScan = new Scan(scanBase);
+    partialScan.setMaxResultSize(1);
+    partialScan.setAllowPartialResults(true);
+    ResultScanner partialScanner = TABLE.getScanner(partialScan);
+
+    Scan oneShotScan = new Scan(scanBase);
+    oneShotScan.setMaxResultSize(Long.MAX_VALUE);
+    ResultScanner oneShotScanner = TABLE.getScanner(oneShotScan);
+
+    ArrayList<Result> partials = new ArrayList<>();
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Result partialResult = null;
+      Result completeResult = null;
+      Result oneShotResult = null;
+      partials.clear();
+
+      do {
+        partialResult = partialScanner.next();
+        partials.add(partialResult);
+      } while (partialResult.isPartial());
+
+      completeResult = Result.createCompleteResult(partials);
+      oneShotResult = oneShotScanner.next();
+
+      compareResults(completeResult, oneShotResult, null);
+    }
+
+    assertTrue(oneShotScanner.next() == null);
+    assertTrue(partialScanner.next() == null);
+
+    oneShotScanner.close();
+    partialScanner.close();
+  }
+
+  /**
+   * When reconstructing the complete result from its partials we ensure that the row of each
+   * partial result is the same. If one of the rows differs, an exception is thrown.
+   */
+  @Test
+  public void testExceptionThrownOnMismatchedPartialResults() throws IOException {
+    assertTrue(NUM_ROWS >= 2);
+
+    ArrayList<Result> partials = new ArrayList<>();
+    Scan scan = new Scan();
+    scan.setMaxResultSize(Long.MAX_VALUE);
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result r1 = scanner.next();
+    partials.add(r1);
+    Result r2 = scanner.next();
+    partials.add(r2);
+
+    assertFalse(Bytes.equals(r1.getRow(), r2.getRow()));
+
+    try {
+      Result.createCompleteResult(partials);
+      fail("r1 and r2 are from different rows. It should not be possible to combine them into"
+          + " a single result");
+    } catch (IOException e) {
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * When a scan has a filter where {@link org.apache.hadoop.hbase.filter.Filter#hasFilterRow()} is
+   * true, the scanner should not return partial results. The scanner cannot return partial results
+   * because the entire row needs to be read for the include/exclude decision to be made
+   */
+  @Test
+  public void testNoPartialResultsWhenFilterPresent() throws Exception {
+    Scan scan = new Scan();
+    scan.setMaxResultSize(1);
+    scan.setAllowPartialResults(true);
+    // If a filter hasFilter() is true then partial results should not be returned else filter
+    // application server side would break.
+    scan.setFilter(new RandomRowFilter(1.0f));
+    ResultScanner scanner = TABLE.getScanner(scan);
+
+    Result r = null;
+    while ((r = scanner.next()) != null) {
+      assertFalse(r.isPartial());
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * Examine the interaction between the maxResultSize and caching. If the caching limit is reached
+   * before the maxResultSize limit, we should not see partial results. On the other hand, if the
+   * maxResultSize limit is reached before the caching limit, it is likely that partial results will
+   * be seen.
+   * @throws Exception
+   */
+  @Test
+  public void testPartialResultsAndCaching() throws Exception {
+    for (int caching = 1; caching <= NUM_ROWS; caching++) {
+      for (int maxResultRows = 0; maxResultRows <= NUM_ROWS; maxResultRows++) {
+        testPartialResultsAndCaching(maxResultRows, caching);
+      }
+    }
+  }
+
+  /**
+   * @param resultSizeRowLimit The row limit that will be enforced through maxResultSize
+   * @param cachingRowLimit The row limit that will be enforced through caching
+   * @throws Exception
+   */
+  public void testPartialResultsAndCaching(int resultSizeRowLimit, int cachingRowLimit)
+      throws Exception {
+    Scan scan = new Scan();
+    scan.setAllowPartialResults(true);
+
+    // The number of cells specified in the call to getResultSizeForNumberOfCells is offset to
+    // ensure that the result size we specify is not an exact multiple of the number of cells
+    // in a row. This ensures that partial results will be returned when the result size limit
+    // is reached before the caching limit.
+    int cellOffset = NUM_COLS / 3;
+    long maxResultSize = getResultSizeForNumberOfCells(resultSizeRowLimit * NUM_COLS + cellOffset);
+    scan.setMaxResultSize(maxResultSize);
+    scan.setCaching(cachingRowLimit);
+
+    ResultScanner scanner = TABLE.getScanner(scan);
+    ClientScanner clientScanner = (ClientScanner) scanner;
+    Result r = null;
+
+    // Approximate the number of rows we expect will fit into the specified max rsult size. If this
+    // approximation is less than caching, then we expect that the max result size limit will be
+    // hit before the caching limit and thus partial results may be seen
+    boolean expectToSeePartialResults = resultSizeRowLimit < cachingRowLimit;
+    while ((r = clientScanner.next()) != null) {
+      assertTrue(!r.isPartial() || expectToSeePartialResults);
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * Small scans should not return partial results because it would prevent small scans from
+   * retrieving all of the necessary results in a single RPC request which is what makese small
+   * scans useful. Thus, ensure that even when {@link Scan#getAllowPartialResults()} is true, small
+   * scans do not return partial results
+   * @throws Exception
+   */
+  @Test
+  public void testSmallScansDoNotAllowPartials() throws Exception {
+    Scan scan = new Scan();
+    testSmallScansDoNotAllowPartials(scan);
+    scan.setReversed(true);
+    testSmallScansDoNotAllowPartials(scan);
+  }
+
+  public void testSmallScansDoNotAllowPartials(Scan baseScan) throws Exception {
+    Scan scan = new Scan(baseScan);
+    scan.setAllowPartialResults(true);
+    scan.setSmall(true);
+    scan.setMaxResultSize(1);
+
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result r = null;
+
+    while ((r = scanner.next()) != null) {
+      assertFalse(r.isPartial());
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * Make puts to put the input value into each combination of row, family, and qualifier
+   * @param rows
+   * @param families
+   * @param qualifiers
+   * @param value
+   * @return
+   * @throws IOException
+   */
+  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
+      byte[] value) throws IOException {
+    Put put;
+    ArrayList<Put> puts = new ArrayList<>();
+
+    for (int row = 0; row < rows.length; row++) {
+      put = new Put(rows[row]);
+      for (int fam = 0; fam < families.length; fam++) {
+        for (int qual = 0; qual < qualifiers.length; qual++) {
+          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
+          put.add(kv);
+        }
+      }
+      puts.add(put);
+    }
+
+    return puts;
+  }
+
+  /**
+   * Make key values to represent each possible combination of family and qualifier in the specified
+   * row.
+   * @param row
+   * @param families
+   * @param qualifiers
+   * @param value
+   * @return
+   */
+  static ArrayList<Cell> createKeyValuesForRow(byte[] row, byte[][] families, byte[][] qualifiers,
+      byte[] value) {
+    ArrayList<Cell> outList = new ArrayList<>();
+    for (int fam = 0; fam < families.length; fam++) {
+      for (int qual = 0; qual < qualifiers.length; qual++) {
+        outList.add(new KeyValue(row, families[fam], qualifiers[qual], qual, value));
+      }
+    }
+    return outList;
+  }
+
+  /**
+   * Verifies that result contains all the key values within expKvList. Fails the test otherwise
+   * @param result
+   * @param expKvList
+   * @param msg
+   */
+  static void verifyResult(Result result, List<Cell> expKvList, String msg) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info(msg);
+      LOG.info("Expected count: " + expKvList.size());
+      LOG.info("Actual count: " + result.size());
+    }
+
+    if (expKvList.size() == 0) return;
+
+    int i = 0;
+    for (Cell kv : result.rawCells()) {
+      if (i >= expKvList.size()) {
+        break; // we will check the size later
+      }
+
+      Cell kvExp = expKvList.get(i++);
+      assertTrue("Not equal. get kv: " + kv.toString() + " exp kv: " + kvExp.toString(),
+        kvExp.equals(kv));
+    }
+
+    assertEquals(expKvList.size(), result.size());
+  }
+
+  /**
+   * Compares two results and fails the test if the results are different
+   * @param r1
+   * @param r2
+   * @param message
+   */
+  static void compareResults(Result r1, Result r2, final String message) {
+    if (LOG.isInfoEnabled()) {
+      if (message != null) LOG.info(message);
+      LOG.info("r1: " + r1);
+      LOG.info("r2: " + r2);
+    }
+
+    final String failureMessage = "Results r1:" + r1 + " r2:" + r2 + " are not equivalent";
+    if (r1 == null && r2 == null) fail(failureMessage);
+    else if (r1 == null || r2 == null) fail(failureMessage);
+
+    try {
+      Result.compareResults(r1, r2);
+    } catch (Exception e) {
+      fail(failureMessage);
+    }
+  }
+
+  @Test
+  public void testReadPointAndPartialResults() throws Exception {
+    TableName testName = TableName.valueOf("testReadPointAndPartialResults");
+    int numRows = 5;
+    int numFamilies = 5;
+    int numQualifiers = 5;
+    byte[][] rows = HTestConst.makeNAscii(Bytes.toBytes("testRow"), numRows);
+    byte[][] families = HTestConst.makeNAscii(Bytes.toBytes("testFamily"), numFamilies);
+    byte[][] qualifiers = HTestConst.makeNAscii(Bytes.toBytes("testQualifier"), numQualifiers);
+    byte[] value = Bytes.createMaxByteArray(100);
+
+    Table tmpTable = createTestTable(testName, rows, families, qualifiers, value);
+
+    Scan scan = new Scan();
+    scan.setMaxResultSize(1);
+    scan.setAllowPartialResults(true);
+
+    // Open scanner before deletes
+    ResultScanner scanner = tmpTable.getScanner(scan);
+
+    Delete delete1 = new Delete(rows[0]);
+    delete1.addColumn(families[0], qualifiers[0], 0);
+    tmpTable.delete(delete1);
+
+    Delete delete2 = new Delete(rows[1]);
+    delete2.addColumn(families[1], qualifiers[1], 1);
+    tmpTable.delete(delete2);
+
+    // Should see all cells because scanner was opened prior to deletes
+    int scannerCount = countCellsFromScanner(scanner);
+    int expectedCount = numRows * numFamilies * numQualifiers;
+    assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount,
+      scannerCount == expectedCount);
+
+    // Minus 2 for the two cells that were deleted
+    scanner = tmpTable.getScanner(scan);
+    scannerCount = countCellsFromScanner(scanner);
+    expectedCount = numRows * numFamilies * numQualifiers - 2;
+    assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount,
+      scannerCount == expectedCount);
+
+    scanner = tmpTable.getScanner(scan);
+    // Put in 2 new rows. The timestamps differ from the deleted rows
+    Put put1 = new Put(rows[0]);
+    put1.add(new KeyValue(rows[0], families[0], qualifiers[0], 1, value));
+    tmpTable.put(put1);
+
+    Put put2 = new Put(rows[1]);
+    put2.add(new KeyValue(rows[1], families[1], qualifiers[1], 2, value));
+    tmpTable.put(put2);
+
+    // Scanner opened prior to puts. Cell count shouldn't have changed
+    scannerCount = countCellsFromScanner(scanner);
+    expectedCount = numRows * numFamilies * numQualifiers - 2;
+    assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount,
+      scannerCount == expectedCount);
+
+    // Now the scanner should see the cells that were added by puts
+    scanner = tmpTable.getScanner(scan);
+    scannerCount = countCellsFromScanner(scanner);
+    expectedCount = numRows * numFamilies * numQualifiers;
+    assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount,
+      scannerCount == expectedCount);
+
+    TEST_UTIL.deleteTable(testName);
+  }
+
+  /**
+   * Exhausts the scanner by calling next repetitively. Once completely exhausted, close scanner and
+   * return total cell count
+   * @param scanner
+   * @return
+   * @throws Exception
+   */
+  private int countCellsFromScanner(ResultScanner scanner) throws Exception {
+    Result result = null;
+    int numCells = 0;
+    while ((result = scanner.next()) != null) {
+      numCells += result.rawCells().length;
+    }
+
+    scanner.close();
+    return numCells;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
index 1f6dc98..cdfb774 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HTestConst;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -93,7 +94,7 @@ public class TestIntraRowPagination {
       RegionScanner scanner = region.getScanner(scan);
       List<Cell> kvListScan = new ArrayList<Cell>();
       List<Cell> results = new ArrayList<Cell>();
-      while (scanner.next(results) || !results.isEmpty()) {
+      while (NextState.hasMoreValues(scanner.next(results)) || !results.isEmpty()) {
         kvListScan.addAll(results);
         results.clear();
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
index 8e37470..68053c0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
@@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.protobuf.RpcCallback;
@@ -89,7 +89,7 @@ implements Coprocessor, CoprocessorService {
       boolean hasMore = false;
       do {
         curVals.clear();
-        hasMore = scanner.next(curVals);
+        hasMore = NextState.hasMoreValues(scanner.next(curVals));
         for (Cell kv : curVals) {
           if (CellUtil.matchingQualifier(kv, qualifier)) {
             sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
index f91b8a6..d85ef5d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationW
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.protobuf.RpcCallback;
@@ -97,7 +98,7 @@ implements Coprocessor, CoprocessorService  {
       boolean hasMore = false;
       do {
         curVals.clear();
-        hasMore = scanner.next(curVals);
+        hasMore = NextState.hasMoreValues(scanner.next(curVals));
         for (Cell kv : curVals) {
           if (CellUtil.matchingQualifier(kv, qualifier)) {
             sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
index a16fc19..3a38297 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationW
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.protobuf.RpcCallback;
@@ -97,7 +98,7 @@ implements Coprocessor, CoprocessorService  {
       boolean hasMore = false;
       do {
         curVals.clear();
-        hasMore = scanner.next(curVals);
+        hasMore = NextState.hasMoreValues(scanner.next(curVals));
         for (Cell kv : curVals) {
           if (CellUtil.matchingQualifier(kv, qualifier)) {
             sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index ce76e8a..75fe93d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -85,28 +85,39 @@ public class TestCoprocessorInterface {
     }
 
     @Override
-    public boolean next(List<Cell> results) throws IOException {
+    public NextState next(List<Cell> results) throws IOException {
       return delegate.next(results);
     }
 
     @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
+    public NextState next(List<Cell> result, int limit) throws IOException {
       return delegate.next(result, limit);
     }
 
     @Override
-    public boolean nextRaw(List<Cell> result) 
+    public NextState next(List<Cell> result, int limit, long remainingResultSize)
         throws IOException {
-      return delegate.nextRaw(result);
+      return delegate.next(result, limit, remainingResultSize);
     }
 
     @Override
-    public boolean nextRaw(List<Cell> result, int limit)
+    public NextState nextRaw(List<Cell> result)
         throws IOException {
+      return delegate.nextRaw(result);
+    }
+
+    @Override
+    public NextState nextRaw(List<Cell> result, int limit) throws IOException {
       return delegate.nextRaw(result, limit);
     }
 
     @Override
+    public NextState nextRaw(List<Cell> result, int limit, long remainingResultSize)
+        throws IOException {
+      return delegate.nextRaw(result, limit, remainingResultSize);
+    }
+
+    @Override
     public void close() throws IOException {
       delegate.close();
     }
@@ -135,6 +146,12 @@ public class TestCoprocessorInterface {
     public long getMvccReadPoint() {
       return delegate.getMvccReadPoint();
     }
+
+    @Override
+    public int getBatch() {
+      return delegate.getBatch();
+    }
+
   }
 
   public static class CoprocessorImpl extends BaseRegionObserver {

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index bf5227b..a4963ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -433,17 +433,24 @@ public class TestRegionObserverInterface {
         Store store, final InternalScanner scanner, final ScanType scanType) {
       return new InternalScanner() {
         @Override
-        public boolean next(List<Cell> results) throws IOException {
+        public NextState next(List<Cell> results) throws IOException {
           return next(results, -1);
         }
 
         @Override
-        public boolean next(List<Cell> results, int limit)
-            throws IOException{
+        public NextState next(List<Cell> results, int limit) throws IOException {
+          return next(results, limit, -1);
+        }
+
+        @Override
+        public NextState next(List<Cell> results, int limit, long remainingResultSize)
+            throws IOException {
           List<Cell> internalResults = new ArrayList<Cell>();
           boolean hasMore;
+          NextState state;
           do {
-            hasMore = scanner.next(internalResults, limit);
+            state = scanner.next(internalResults, limit, remainingResultSize);
+            hasMore = state != null && state.hasMoreValues();
             if (!internalResults.isEmpty()) {
               long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0)));
               if (row % 2 == 0) {
@@ -458,7 +465,7 @@ public class TestRegionObserverInterface {
           if (!internalResults.isEmpty()) {
             results.addAll(internalResults);
           }
-          return hasMore;
+          return state;
         }
 
         @Override
@@ -772,4 +779,4 @@ public class TestRegionObserverInterface {
       writer.close();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
index 2e51c82..abd9921 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.filter;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,12 +27,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.FilterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -97,7 +105,7 @@ public class TestColumnPrefixFilter {
 
         InternalScanner scanner = region.getScanner(scan);
         List<Cell> results = new ArrayList<Cell>();
-        while(scanner.next(results));
+        while (NextState.hasMoreValues(scanner.next(results)));
         assertEquals(prefixMap.get(s).size(), results.size());
       }
     } finally {
@@ -162,7 +170,7 @@ public class TestColumnPrefixFilter {
 
         InternalScanner scanner = region.getScanner(scan);
         List<Cell> results = new ArrayList<Cell>();
-        while(scanner.next(results));
+        while (NextState.hasMoreValues(scanner.next(results)));
         assertEquals(prefixMap.get(s).size(), results.size());
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/de9791e9/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
index 40a4c43..97f0874 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
@@ -19,6 +19,10 @@
 
 package org.apache.hadoop.hbase.filter;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -26,24 +30,26 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.FilterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import org.junit.experimental.categories.Category;
 
 @Category({FilterTests.class, SmallTests.class})
@@ -145,7 +151,7 @@ public class TestDependentColumnFilter {
     int i = 0;
     int cells = 0;
     for (boolean done = true; done; i++) {
-      done = scanner.next(results);
+      done = NextState.hasMoreValues(scanner.next(results));
       Arrays.sort(results.toArray(new KeyValue[results.size()]),
           KeyValue.COMPARATOR);
       LOG.info("counter=" + i + ", " + results);


Mime
View raw message