Revert "HBASE-11544 [Ergonomics] hbase.client.scanner.caching is dogged and will try to return batch even if it means OOME"
References the wrong JIRA above so revert; also an addendum is on the
way.
This reverts commit 26ba621e47e886fb3b1336f2201b8efbda86ff91.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8cd3001f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8cd3001f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8cd3001f
Branch: refs/heads/hbase-12439
Commit: 8cd3001f817915df20a4d209c450ac9b69b915d7
Parents: 26ba621
Author: stack <stack@apache.org>
Authored: Wed Apr 8 09:32:09 2015 -0700
Committer: stack <stack@apache.org>
Committed: Wed Apr 8 09:32:09 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 22 +-
.../hadoop/hbase/client/ClientScanner.java | 3 -
.../client/ScannerCallableWithReplicas.java | 34 +-
.../java/org/apache/hadoop/hbase/CellUtil.java | 1 -
.../org/apache/hadoop/hbase/HConstants.java | 19 +-
.../coprocessor/example/BulkDeleteEndpoint.java | 3 +-
.../coprocessor/example/RowCountEndpoint.java | 5 +-
.../hbase/client/ClientSideRegionScanner.java | 8 +-
.../coprocessor/AggregateImplementation.java | 15 +-
.../hadoop/hbase/regionserver/HRegion.java | 271 +++++-----
.../hbase/regionserver/InternalScanner.java | 209 +++++++-
.../hadoop/hbase/regionserver/KeyValueHeap.java | 44 +-
.../regionserver/NoLimitScannerContext.java | 94 ----
.../hbase/regionserver/RSRpcServices.java | 66 ++-
.../hbase/regionserver/RegionScanner.java | 50 +-
.../hbase/regionserver/ScannerContext.java | 527 -------------------
.../hadoop/hbase/regionserver/StoreFlusher.java | 7 +-
.../hadoop/hbase/regionserver/StoreScanner.java | 78 +--
.../regionserver/compactions/Compactor.java | 7 +-
.../security/access/AccessControlLists.java | 3 +-
.../hbase/security/access/AccessController.java | 6 +-
.../org/apache/hadoop/hbase/HBaseTestCase.java | 3 +-
.../hbase/TestPartialResultsFromClientSide.java | 8 +-
.../hbase/client/TestIntraRowPagination.java | 3 +-
.../hadoop/hbase/client/TestReplicasClient.java | 90 +---
.../coprocessor/ColumnAggregationEndpoint.java | 3 +-
.../ColumnAggregationEndpointNullResponse.java | 3 +-
.../ColumnAggregationEndpointWithErrors.java | 3 +-
.../coprocessor/TestCoprocessorInterface.java | 23 +-
.../TestRegionObserverInterface.java | 19 +-
.../hbase/filter/TestColumnPrefixFilter.java | 7 +-
.../hbase/filter/TestDependentColumnFilter.java | 3 +-
.../apache/hadoop/hbase/filter/TestFilter.java | 29 +-
.../filter/TestInvocationRecordFilter.java | 5 +-
.../filter/TestMultipleColumnPrefixFilter.java | 9 +-
.../hbase/io/encoding/TestPrefixTree.java | 11 +-
.../TestScannerSelectionUsingKeyRange.java | 5 +-
.../io/hfile/TestScannerSelectionUsingTTL.java | 3 +-
.../hbase/regionserver/TestAtomicOperation.java | 9 +-
.../hbase/regionserver/TestBlocksScanned.java | 8 +-
.../hbase/regionserver/TestColumnSeeking.java | 5 +-
.../hbase/regionserver/TestDefaultMemStore.java | 9 +-
.../regionserver/TestGetClosestAtOrBefore.java | 5 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 104 ++--
.../hbase/regionserver/TestKeepDeletes.java | 6 +-
.../hbase/regionserver/TestMajorCompaction.java | 9 +-
.../regionserver/TestMultiColumnScanner.java | 3 +-
.../TestRegionMergeTransaction.java | 3 +-
.../regionserver/TestReversibleScanners.java | 3 +-
.../regionserver/TestScanWithBloomError.java | 3 +-
.../hadoop/hbase/regionserver/TestScanner.java | 11 +-
.../regionserver/TestSeekOptimizations.java | 3 +-
.../regionserver/TestSplitTransaction.java | 3 +-
.../hbase/regionserver/TestStoreScanner.java | 53 +-
.../hbase/regionserver/TestStripeCompactor.java | 16 +-
.../hbase/regionserver/TestWideScanner.java | 3 +-
.../compactions/TestStripeCompactionPolicy.java | 18 +-
.../hbase/regionserver/wal/TestWALReplay.java | 3 +-
.../apache/hadoop/hbase/util/TestMergeTool.java | 3 +-
59 files changed, 787 insertions(+), 1192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 4fae2c7..a0ab484 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -159,9 +159,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
*/
@Deprecated
public static final String DEFERRED_LOG_FLUSH = "DEFERRED_LOG_FLUSH";
- /**
- * @deprecated
- */
@Deprecated
private static final Bytes DEFERRED_LOG_FLUSH_KEY =
new Bytes(Bytes.toBytes(DEFERRED_LOG_FLUSH));
@@ -318,7 +315,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
* Construct a table descriptor specifying a byte array table name
* @param name Table name.
* @see <a href="HADOOP-1581">HADOOP-1581 HBASE: Un-openable tablename bug</a>
- * @deprecated
*/
@Deprecated
public HTableDescriptor(final byte[] name) {
@@ -329,7 +325,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
* Construct a table descriptor specifying a String table name
* @param name Table name.
* @see <a href="HADOOP-1581">HADOOP-1581 HBASE: Un-openable tablename bug</a>
- * @deprecated
*/
@Deprecated
public HTableDescriptor(final String name) {
@@ -704,7 +699,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
* Set the name of the table.
*
* @param name name of table
- * @deprecated
*/
@Deprecated
public HTableDescriptor setName(byte[] name) {
@@ -712,9 +706,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
return this;
}
- /**
- * @deprecated
- */
@Deprecated
public HTableDescriptor setName(TableName name) {
this.name = name;
@@ -1349,7 +1340,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
* @param rootdir qualified path of HBase root directory
* @param tableName name of table
* @return {@link Path} for table
- * @deprecated
*/
@Deprecated
public static Path getTableDir(Path rootdir, final byte [] tableName) {
@@ -1363,7 +1353,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
/** Table descriptor for <code>hbase:meta</code> catalog table
* Deprecated, use TableDescriptors#get(TableName.META_TABLE) or
* Admin#getTableDescriptor(TableName.META_TABLE) instead.
- * @deprecated
*/
@Deprecated
public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
@@ -1423,18 +1412,12 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
.setCacheDataInL1(true)
});
- /**
- * @deprecated
- */
@Deprecated
public HTableDescriptor setOwner(User owner) {
return setOwnerString(owner != null ? owner.getShortName() : null);
}
- /**
- * used by admin.rb:alter(table_name,*args) to update owner.
- * @deprecated
- */
+ // used by admin.rb:alter(table_name,*args) to update owner.
@Deprecated
public HTableDescriptor setOwnerString(String ownerString) {
if (ownerString != null) {
@@ -1445,9 +1428,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
return this;
}
- /**
- * @deprecated
- */
@Deprecated
public String getOwnerString() {
if (getValue(OWNER_KEY) != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index ccd8c2d..05a780c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -401,9 +401,6 @@ public class ClientScanner extends AbstractClientScanner {
// happens for the cases where we see exceptions. Since only openScanner
// would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) {
- // Any accumulated partial results are no longer valid since the callable will
- // openScanner with the correct startkey and we must pick up from there
- clearPartialResults();
this.currentRegion = callable.getHRegionInfo();
continue;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 7ba152b..ca6ab05 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -292,7 +292,14 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
continue; //this was already scheduled earlier
}
ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
- setStartRowForReplicaCallable(s);
+
+ if (this.lastResult != null) {
+ if(s.getScan().isReversed()){
+ s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
+ }else {
+ s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
+ }
+ }
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica, scannerTimeout, id);
@@ -300,31 +307,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return max - min + 1;
}
- /**
- * Set the start row for the replica callable based on the state of the last result received.
- * @param callable The callable to set the start row on
- */
- private void setStartRowForReplicaCallable(ScannerCallable callable) {
- if (this.lastResult == null || callable == null) return;
-
- if (this.lastResult.isPartial()) {
- // The last result was a partial result which means we have not received all of the cells
- // for this row. Thus, use the last result's row as the start row. If a replica switch
- // occurs, the scanner will ensure that any accumulated partial results are cleared,
- // and the scan can resume from this row.
- callable.getScan().setStartRow(this.lastResult.getRow());
- } else {
- // The last result was not a partial result which means it contained all of the cells for
- // that row (we no longer need any information from it). Set the start row to the next
- // closest row that could be seen.
- if (callable.getScan().isReversed()) {
- callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
- } else {
- callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
- }
- }
- }
-
@VisibleForTesting
boolean isAnyRPCcancelled() {
return someRPCcancelled;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 2060488..bce3957 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -574,7 +574,6 @@ public final class CellUtil {
* backwards compatible with estimations done by older clients. We need to
* pretend that tags never exist and cells aren't serialized with tag
* length included. See HBASE-13262 and HBASE-13303
- * @deprecated See above comment
*/
@Deprecated
public static long estimatedHeapSizeOfWithoutTags(final Cell cell) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index b5a6318..19e251a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -391,7 +391,7 @@ public final class HConstants {
/**
* The hbase:meta table's name.
- * @deprecated For 0.94 to 0.96 compatibility. Replaced by define in TableName
+ *
*/
@Deprecated // for compat from 0.94 -> 0.96.
public static final byte[] META_TABLE_NAME = TableName.META_TABLE_NAME.getName();
@@ -579,7 +579,7 @@ public final class HConstants {
* 1, 2, 3, 5, 10, 20, 40, 100, 100, 100.
* With 100ms, a back-off of 200 means 20s
*/
- public static final int [] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200};
+ public static final int RETRY_BACKOFF[] = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200};
public static final String REGION_IMPL = "hbase.hregion.impl";
@@ -780,8 +780,7 @@ public final class HConstants {
/**
* timeout for short operation RPC
*/
- public static final String HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY =
- "hbase.rpc.shortoperation.timeout";
+ public static final String HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY = "hbase.rpc.shortoperation.timeout";
/**
* Default value of {@link #HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY}
@@ -836,8 +835,8 @@ public final class HConstants {
*/
public static final float HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD = 0.2f;
- public static final Pattern CP_HTD_ATTR_KEY_PATTERN = Pattern.compile(
- "^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE);
+ public static final Pattern CP_HTD_ATTR_KEY_PATTERN = Pattern.compile
+ ("^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE);
public static final Pattern CP_HTD_ATTR_VALUE_PATTERN =
Pattern.compile("(^[^\\|]*)\\|([^\\|]+)\\|[\\s]*([\\d]*)[\\s]*(\\|.*)?$");
@@ -890,7 +889,7 @@ public final class HConstants {
* 1 => Abort only all of the handers have died
*/
public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT =
- "hbase.regionserver.handler.abort.on.error.percent";
+ "hbase.regionserver.handler.abort.on.error.percent";
public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5;
//High priority handlers to deal with admin requests and system table operation requests
@@ -950,8 +949,7 @@ public final class HConstants {
public static final String DEFAULT_WAL_STORAGE_POLICY = "NONE";
/** Region in Transition metrics threshold time */
- public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
- "hbase.metrics.rit.stuck.warning.threshold";
+ public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold";
public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop";
@@ -1046,8 +1044,7 @@ public final class HConstants {
* 0.0.0.0.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-9961">HBASE-9961</a>
*/
- public static final String STATUS_MULTICAST_BIND_ADDRESS =
- "hbase.status.multicast.bind.address.ip";
+ public static final String STATUS_MULTICAST_BIND_ADDRESS = "hbase.status.multicast.bind.address.ip";
public static final String DEFAULT_STATUS_MULTICAST_BIND_ADDRESS = "0.0.0.0";
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
index 93f98ac..e0c3bae 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.Bu
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -136,7 +137,7 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor
List<List<Cell>> deleteRows = new ArrayList<List<Cell>>(rowBatchSize);
for (int i = 0; i < rowBatchSize; i++) {
List<Cell> results = new ArrayList<Cell>();
- hasMore = scanner.next(results);
+ hasMore = NextState.hasMoreValues(scanner.next(results));
if (results.size() > 0) {
deleteRows.add(results);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
index 4309cdc..2afd05e 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
@@ -80,7 +81,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService
byte[] lastRow = null;
long count = 0;
do {
- hasMore = scanner.next(results);
+ hasMore = NextState.hasMoreValues(scanner.next(results));
for (Cell kv : results) {
byte[] currentRow = CellUtil.cloneRow(kv);
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
@@ -119,7 +120,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService
boolean hasMore = false;
long count = 0;
do {
- hasMore = scanner.next(results);
+ hasMore = NextState.hasMoreValues(scanner.next(results));
for (Cell kv : results) {
count++;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 04b88e9..a80a07e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -29,10 +30,8 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.mortbay.log.Log;
@@ -73,7 +72,10 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
public Result next() throws IOException {
values.clear();
- scanner.nextRaw(values, NoLimitScannerContext.NO_LIMIT);
+ // negative values indicate no limits
+ final long remainingResultSize = -1;
+ final int batchLimit = -1;
+ scanner.nextRaw(values, batchLimit, remainingResultSize);
if (values.isEmpty()) {
//we are done
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index 81c933b..b6f834e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateReque
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@@ -91,7 +92,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
// qualifier can be null.
boolean hasMoreRows = false;
do {
- hasMoreRows = scanner.next(results);
+ hasMoreRows = NextState.hasMoreValues(scanner.next(results));
int listSize = results.size();
for (int i = 0; i < listSize; i++) {
temp = ci.getValue(colFamily, qualifier, results.get(i));
@@ -145,7 +146,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
}
boolean hasMoreRows = false;
do {
- hasMoreRows = scanner.next(results);
+ hasMoreRows = NextState.hasMoreValues(scanner.next(results));
int listSize = results.size();
for (int i = 0; i < listSize; i++) {
temp = ci.getValue(colFamily, qualifier, results.get(i));
@@ -199,7 +200,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
do {
- hasMoreRows = scanner.next(results);
+ hasMoreRows = NextState.hasMoreValues(scanner.next(results));
int listSize = results.size();
for (int i = 0; i < listSize; i++) {
temp = ci.getValue(colFamily, qualifier, results.get(i));
@@ -253,7 +254,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
scanner = env.getRegion().getScanner(scan);
boolean hasMoreRows = false;
do {
- hasMoreRows = scanner.next(results);
+ hasMoreRows = NextState.hasMoreValues(scanner.next(results));
if (results.size() > 0) {
counter++;
}
@@ -312,7 +313,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
do {
results.clear();
- hasMoreRows = scanner.next(results);
+ hasMoreRows = NextState.hasMoreValues(scanner.next(results));
int listSize = results.size();
for (int i = 0; i < listSize; i++) {
sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
@@ -373,7 +374,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
do {
tempVal = null;
- hasMoreRows = scanner.next(results);
+ hasMoreRows = NextState.hasMoreValues(scanner.next(results));
int listSize = results.size();
for (int i = 0; i < listSize; i++) {
tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
@@ -440,7 +441,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
do {
tempVal = null;
tempWeight = null;
- hasMoreRows = scanner.next(results);
+ hasMoreRows = NextState.hasMoreValues(scanner.next(results));
int listSize = results.size();
for (int i = 0; i < listSize; i++) {
Cell kv = results.get(i);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e082698..4a8e7cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -141,9 +141,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
-import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
-import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
@@ -5176,7 +5175,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected Cell joinedContinuationRow = null;
protected final byte[] stopRow;
private final FilterWrapper filter;
- private ScannerContext defaultScannerContext;
+ private int batch;
protected int isScan;
private boolean filterClosed = false;
private long readPt;
@@ -5199,13 +5198,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.filter = null;
}
- /**
- * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
- * scanner context that can be used to enforce the batch limit in the event that a
- * ScannerContext is not specified during an invocation of next/nextRaw
- */
- defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
-
+ this.batch = scan.getBatch();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
this.stopRow = null;
} else {
@@ -5266,7 +5259,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public int getBatch() {
- return this.defaultScannerContext.getBatchLimit();
+ return this.batch;
}
/**
@@ -5281,14 +5274,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public boolean next(List<Cell> outResults)
+ public NextState next(List<Cell> outResults)
throws IOException {
// apply the batching limit by default
- return next(outResults, defaultScannerContext);
+ return next(outResults, batch);
+ }
+
+ @Override
+ public NextState next(List<Cell> outResults, int limit) throws IOException {
+ return next(outResults, limit, -1);
}
@Override
- public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
+ public synchronized NextState next(List<Cell> outResults, int limit, long remainingResultSize)
throws IOException {
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
@@ -5298,107 +5296,122 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.SCAN);
readRequestsCount.increment();
try {
- return nextRaw(outResults, scannerContext);
+ return nextRaw(outResults, limit, remainingResultSize);
} finally {
closeRegionOperation(Operation.SCAN);
}
}
@Override
- public boolean nextRaw(List<Cell> outResults) throws IOException {
- // Use the RegionScanner's context by default
- return nextRaw(outResults, defaultScannerContext);
+ public NextState nextRaw(List<Cell> outResults) throws IOException {
+ return nextRaw(outResults, batch);
}
@Override
- public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
+ public NextState nextRaw(List<Cell> outResults, int limit)
+ throws IOException {
+ return nextRaw(outResults, limit, -1);
+ }
+
+ @Override
+ public NextState nextRaw(List<Cell> outResults, int batchLimit, long remainingResultSize)
throws IOException {
if (storeHeap == null) {
// scanner is closed
throw new UnknownScannerException("Scanner was closed");
}
- boolean moreValues;
+ NextState state;
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
- moreValues = nextInternal(outResults, scannerContext);
+ state = nextInternal(outResults, batchLimit, remainingResultSize);
} else {
List<Cell> tmpList = new ArrayList<Cell>();
- moreValues = nextInternal(tmpList, scannerContext);
+ state = nextInternal(tmpList, batchLimit, remainingResultSize);
outResults.addAll(tmpList);
}
+ // Invalid states should never be returned. Receiving an invalid state means that we have
+ // no clue how to proceed. Throw an exception.
+ if (!NextState.isValidState(state)) {
+ throw new IOException("Invalid state returned from nextInternal. state:" + state);
+ }
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
- if (!scannerContext.partialResultFormed()) resetFilters();
+ if (!state.sizeLimitReached()) resetFilters();
if (isFilterDoneInternal()) {
- moreValues = false;
+ state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize());
}
- return moreValues;
+ return state;
}
/**
- * @return true if more cells exist after this batch, false if scanner is done
+ * @return the state the joinedHeap returned on the call to
+ * {@link KeyValueHeap#next(List, int, long)}
*/
- private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
+ private NextState populateFromJoinedHeap(List<Cell> results, int limit, long resultSize)
throws IOException {
assert joinedContinuationRow != null;
- boolean moreValues =
- populateResult(results, this.joinedHeap, scannerContext,
+ NextState state =
+ populateResult(results, this.joinedHeap, limit, resultSize,
joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
joinedContinuationRow.getRowLength());
-
- if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
+ if (state != null && !state.batchLimitReached() && !state.sizeLimitReached()) {
// We are done with this row, reset the continuation.
joinedContinuationRow = null;
}
// As the data is obtained from two independent heaps, we need to
// ensure that result list is sorted, because Result relies on that.
Collections.sort(results, comparator);
- return moreValues;
+ return state;
}
/**
* Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
* reached, or remainingResultSize (if not -1) is reaced
* @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
- * @param scannerContext
+ * @param remainingResultSize The remaining space within our result size limit. A negative value
+ * indicate no limit
+ * @param batchLimit Max amount of KVs to place in result list, -1 means no limit.
* @param currentRow Byte array with key we are fetching.
* @param offset offset for currentRow
* @param length length for currentRow
* @return state of last call to {@link KeyValueHeap#next()}
*/
- private boolean populateResult(List<Cell> results, KeyValueHeap heap,
- ScannerContext scannerContext, byte[] currentRow, int offset, short length)
- throws IOException {
+ private NextState populateResult(List<Cell> results, KeyValueHeap heap, int batchLimit,
+ long remainingResultSize, byte[] currentRow, int offset, short length) throws IOException {
Cell nextKv;
boolean moreCellsInRow = false;
- boolean tmpKeepProgress = scannerContext.getKeepProgress();
- // Scanning between column families and thus the scope is between cells
- LimitScope limitScope = LimitScope.BETWEEN_CELLS;
+ long accumulatedResultSize = 0;
+ List<Cell> tmpResults = new ArrayList<Cell>();
do {
- // We want to maintain any progress that is made towards the limits while scanning across
- // different column families. To do this, we toggle the keep progress flag on during calls
- // to the StoreScanner to ensure that any progress made thus far is not wiped away.
- scannerContext.setKeepProgress(true);
- heap.next(results, scannerContext);
- scannerContext.setKeepProgress(tmpKeepProgress);
+ int remainingBatchLimit = batchLimit - results.size();
+ NextState heapState =
+ heap.next(tmpResults, remainingBatchLimit, remainingResultSize - accumulatedResultSize);
+ results.addAll(tmpResults);
+ accumulatedResultSize += calculateResultSize(tmpResults, heapState);
+ tmpResults.clear();
+
+ if (batchLimit > 0 && results.size() == batchLimit) {
+ return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, accumulatedResultSize);
+ }
nextKv = heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
-
- if (scannerContext.checkBatchLimit(limitScope)) {
- return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
- } else if (scannerContext.checkSizeLimit(limitScope)) {
- ScannerContext.NextState state =
- moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
+ boolean sizeLimitReached =
+ remainingResultSize > 0 && accumulatedResultSize >= remainingResultSize;
+ if (moreCellsInRow && sizeLimitReached) {
+ return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize);
}
} while (moreCellsInRow);
- return nextKv != null;
+ if (nextKv != null) {
+ return NextState.makeState(NextState.State.MORE_VALUES, accumulatedResultSize);
+ } else {
+ return NextState.makeState(NextState.State.NO_MORE_VALUES, accumulatedResultSize);
+ }
}
/**
@@ -5416,6 +5429,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length);
}
+ /**
+ * Calculates the size of the results. If the state of the scanner that these results came from
+ * indicates that an estimate of the result size has already been generated, we can skip the
+ * calculation and use that instead.
+ * @param results List of cells we want to calculate size of
+ * @param state The state returned from the scanner that generated these results
+ * @return aggregate size of results
+ */
+ private long calculateResultSize(List<Cell> results, NextState state) {
+ if (results == null || results.isEmpty()) return 0;
+
+ // In general, the state should contain the estimate because the result size used to
+ // determine when the scan has exceeded its size limit. If the estimate is contained in the
+ // state then we can avoid an unnecesasry calculation.
+ if (state != null && state.hasResultSizeEstimate()) return state.getResultSize();
+
+ long size = 0;
+ for (Cell c : results) {
+ size += CellUtil.estimatedHeapSizeOfWithoutTags(c);
+ }
+
+ return size;
+ }
+
/*
* @return True if a filter rules the scanner is over, done.
*/
@@ -5428,37 +5465,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return this.filter != null && this.filter.filterAllRemaining();
}
- private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
+ private NextState nextInternal(List<Cell> results, int batchLimit, long remainingResultSize)
throws IOException {
if (!results.isEmpty()) {
throw new IllegalArgumentException("First parameter should be an empty list");
}
- if (scannerContext == null) {
- throw new IllegalArgumentException("Scanner context cannot be null");
- }
+ // Estimate of the size (heap size) of the results returned from this method
+ long resultSize = 0;
RpcCallContext rpcCall = RpcServer.getCurrentCall();
-
- // Save the initial progress from the Scanner context in these local variables. The progress
- // may need to be reset a few times if rows are being filtered out so we save the initial
- // progress.
- int initialBatchProgress = scannerContext.getBatchProgress();
- long initialSizeProgress = scannerContext.getSizeProgress();
-
// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
// Then we loop and try again. Otherwise, we must get out on the first iteration via return,
// "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
// and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
while (true) {
- // Starting to scan a new row. Reset the scanner progress according to whether or not
- // progress should be kept.
- if (scannerContext.getKeepProgress()) {
- // Progress should be kept. Reset to initial values seen at start of method invocation.
- scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
- } else {
- scannerContext.clearProgress();
- }
-
if (rpcCall != null) {
// If a user specifies a too-restrictive or too-slow scanner, the
// client might time out and disconnect while the server side
@@ -5486,24 +5506,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
boolean stopRow = isStopRow(currentRow, offset, length);
- // When has filter row is true it means that the all the cells for a particular row must be
- // read before a filtering decision can be made. This means that filters where hasFilterRow
- // run the risk of encountering out of memory errors in the case that they are applied to a
- // table that has very large rows.
boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
// If filter#hasFilterRow is true, partial results are not allowed since allowing them
// would prevent the filters from being evaluated. Thus, if it is true, change the
- // scope of any limits that could potentially create partial results to
- // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
- if (hasFilterRow) {
+ // remainingResultSize to -1 so that the entire row's worth of cells are fetched.
+ if (hasFilterRow && remainingResultSize > 0) {
+ remainingResultSize = -1;
if (LOG.isTraceEnabled()) {
- LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
- + " formed. Changing scope of limits that may create partials");
+ LOG.trace("filter#hasFilterRow is true which prevents partial results from being " +
+ " formed. The remainingResultSize of: " + remainingResultSize + " will not " +
+ " be considered when fetching the cells for this row.");
}
- scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
}
+ NextState joinedHeapState;
// Check if we were getting data from the joinedHeap and hit the limit.
// If not, then it's main path - getting results from storeHeap.
if (joinedContinuationRow == null) {
@@ -5512,30 +5529,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (hasFilterRow) {
filter.filterRowCells(results);
}
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize);
}
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length);
- if (!moreRows) {
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- }
+ if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize);
results.clear();
continue;
}
- // Ok, we are good, let's try to get some results from the main heap.
- populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
+ NextState storeHeapState =
+ populateResult(results, this.storeHeap, batchLimit, remainingResultSize, currentRow,
+ offset, length);
+ resultSize += calculateResultSize(results, storeHeapState);
+ // Invalid states should never be returned. If one is seen, throw exception
+ // since we have no way of telling how we should proceed
+ if (!NextState.isValidState(storeHeapState)) {
+ throw new IOException("NextState returned from call storeHeap was invalid");
+ }
- if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
+ // Ok, we are good, let's try to get some results from the main heap.
+ if (storeHeapState.batchLimitReached()) {
if (hasFilterRow) {
throw new IncompatibleFilterException(
- "Filter whose hasFilterRow() returns true is incompatible with scans that must "
- + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
+ "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
}
- return true;
+ // We hit the batch limit.
+ return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, resultSize);
+ } else if (storeHeapState.sizeLimitReached()) {
+ if (hasFilterRow) {
+ // We try to guard against this case above when remainingResultSize is set to -1 if
+ // hasFilterRow is true. In the even that the guard doesn't work, an exception must be
+ // thrown
+ throw new IncompatibleFilterException(
+ "Filter whose hasFilterRows() returns true is incompatible with scans that"
+ + " return partial results");
+ }
+ // We hit the size limit.
+ return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize);
}
Cell nextKv = this.storeHeap.peek();
stopRow = nextKv == null ||
@@ -5548,31 +5582,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (hasFilterRow) {
ret = filter.filterRowCellsWithRet(results);
-
- // We don't know how the results have changed after being filtered. Must set progress
- // according to contents of results now.
- if (scannerContext.getKeepProgress()) {
- scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
- } else {
- scannerContext.clearProgress();
- }
- scannerContext.incrementBatchProgress(results.size());
- for (Cell cell : results) {
- scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
- }
}
if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
results.clear();
boolean moreRows = nextRow(currentRow, offset, length);
- if (!moreRows) {
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- }
+ if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!stopRow) continue;
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
}
// Ok, we are done with storeHeap for this row.
@@ -5590,24 +5610,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
currentRow, offset, length));
if (mayHaveData) {
joinedContinuationRow = current;
- populateFromJoinedHeap(results, scannerContext);
-
- if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
- return true;
+ joinedHeapState =
+ populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize);
+ resultSize +=
+ joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ?
+ joinedHeapState.getResultSize() : 0;
+ if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) {
+ return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize);
}
}
}
} else {
// Populating from the joined heap was stopped by limits, populate some more.
- populateFromJoinedHeap(results, scannerContext);
- if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
- return true;
+ joinedHeapState =
+ populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize);
+ resultSize +=
+ joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ?
+ joinedHeapState.getResultSize() : 0;
+ if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) {
+ return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize);
}
}
// We may have just called populateFromJoinedMap and hit the limits. If that is
// the case, we need to call it again on the next next() invocation.
if (joinedContinuationRow != null) {
- return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+ return NextState.makeState(NextState.State.MORE_VALUES, resultSize);
}
// Finally, we are done with both joinedHeap and storeHeap.
@@ -5615,17 +5642,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
boolean moreRows = nextRow(currentRow, offset, length);
- if (!moreRows) {
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- }
+ if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
if (!stopRow) continue;
}
// We are done. Return the result.
if (stopRow) {
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize);
} else {
- return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+ return NextState.makeState(NextState.State.MORE_VALUES, resultSize);
}
}
}
@@ -7244,7 +7269,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean done;
do {
kvs.clear();
- done = scanner.next(kvs);
+ done = NextState.hasMoreValues(scanner.next(kvs));
if (kvs.size() > 0) LOG.info(kvs);
} while (done);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
index f73e363..ea5a75f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
@@ -42,21 +42,218 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface InternalScanner extends Closeable {
/**
+ * This class encapsulates all the meaningful state information that we would like the know about
+ * after a call to {@link InternalScanner#next(List)}. While this is not an enum, a restriction on
+ * the possible states is implied through the exposed {@link #makeState(State)} method.
+ */
+ public static class NextState {
+ /**
+ * The possible states we want to restrict ourselves to. This enum is not sufficient to
+ * encapsulate all of the state information since some of the fields of the state must be
+ * dynamic (e.g. resultSize).
+ */
+ public enum State {
+ MORE_VALUES(true),
+ NO_MORE_VALUES(false),
+ SIZE_LIMIT_REACHED(true),
+ BATCH_LIMIT_REACHED(true);
+
+ private boolean moreValues;
+
+ private State(final boolean moreValues) {
+ this.moreValues = moreValues;
+ }
+
+ /**
+ * @return true when the state indicates that more values may follow those that have been
+ * returned
+ */
+ public boolean hasMoreValues() {
+ return this.moreValues;
+ }
+ }
+
+ /**
+ * state variables
+ */
+ private final State state;
+ private long resultSize;
+
+ /**
+ * Value to use for resultSize when the size has not been calculated. Must be a negative number
+ * so that {@link NextState#hasResultSizeEstimate()} returns false.
+ */
+ private static final long DEFAULT_RESULT_SIZE = -1;
+
+ private NextState(State state, long resultSize) {
+ this.state = state;
+ this.resultSize = resultSize;
+ }
+
+ /**
+ * @param state
+ * @return An instance of {@link NextState} where the size of the results returned from the call
+ * to {@link InternalScanner#next(List)} is unknown. It it the responsibility of the
+ * caller of {@link InternalScanner#next(List)} to calculate the result size if needed
+ */
+ public static NextState makeState(final State state) {
+ return makeState(state, DEFAULT_RESULT_SIZE);
+ }
+
+ /**
+ * @param state
+ * @param resultSize
+ * @return An instance of {@link NextState} where the size of the values returned from the call
+ * to {@link InternalScanner#next(List)} is known. The caller can avoid recalculating
+ * the result size by using the cached value retrievable via {@link #getResultSize()}
+ */
+ public static NextState makeState(final State state, long resultSize) {
+ switch (state) {
+ case MORE_VALUES:
+ return createMoreValuesState(resultSize);
+ case NO_MORE_VALUES:
+ return createNoMoreValuesState(resultSize);
+ case BATCH_LIMIT_REACHED:
+ return createBatchLimitReachedState(resultSize);
+ case SIZE_LIMIT_REACHED:
+ return createSizeLimitReachedState(resultSize);
+ default:
+ // If the state is not recognized, default to no more value state
+ return createNoMoreValuesState(resultSize);
+ }
+ }
+
+ /**
+ * Convenience method for creating a state that indicates that more values can be scanned
+ * @param resultSize estimate of the size (heap size) of the values returned from the call to
+ * {@link InternalScanner#next(List)}
+ */
+ private static NextState createMoreValuesState(long resultSize) {
+ return new NextState(State.MORE_VALUES, resultSize);
+ }
+
+ /**
+ * Convenience method for creating a state that indicates that no more values can be scanned.
+ * @param resultSize estimate of the size (heap size) of the values returned from the call to
+ * {@link InternalScanner#next(List)}
+ */
+ private static NextState createNoMoreValuesState(long resultSize) {
+ return new NextState(State.NO_MORE_VALUES, resultSize);
+ }
+
+ /**
+ * Convenience method for creating a state that indicates that the scan stopped because the
+ * batch limit was exceeded
+ * @param resultSize estimate of the size (heap size) of the values returned from the call to
+ * {@link InternalScanner#next(List)}
+ */
+ private static NextState createBatchLimitReachedState(long resultSize) {
+ return new NextState(State.BATCH_LIMIT_REACHED, resultSize);
+ }
+
+ /**
+ * Convenience method for creating a state that indicates that the scan stopped due to the size
+ * limit
+ * @param resultSize estimate of the size (heap size) of the values returned from the call to
+ * {@link InternalScanner#next(List)}
+ */
+ private static NextState createSizeLimitReachedState(long resultSize) {
+ return new NextState(State.SIZE_LIMIT_REACHED, resultSize);
+ }
+
+ /**
+ * @return true when the scanner has more values to be scanned following the values returned by
+ * the call to {@link InternalScanner#next(List)}
+ */
+ public boolean hasMoreValues() {
+ return this.state.hasMoreValues();
+ }
+
+ /**
+ * @return true when the scanner had to stop scanning because it reached the batch limit
+ */
+ public boolean batchLimitReached() {
+ return this.state == State.BATCH_LIMIT_REACHED;
+ }
+
+ /**
+ * @return true when the scanner had to stop scanning because it reached the size limit
+ */
+ public boolean sizeLimitReached() {
+ return this.state == State.SIZE_LIMIT_REACHED;
+ }
+
+ /**
+ * @return The size (heap size) of the values that were returned from the call to
+ * {@link InternalScanner#next(List)}. This value should only be used if
+ * {@link #hasResultSizeEstimate()} returns true.
+ */
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ /**
+ * @return true when an estimate for the size of the values returned by
+ * {@link InternalScanner#next(List)} was provided. If false, it is the responsibility
+ * of the caller to calculate the result size
+ */
+ public boolean hasResultSizeEstimate() {
+ return resultSize >= 0;
+ }
+
+ @Override
+ public String toString() {
+ return "State: " + state + " resultSize: " + resultSize;
+ }
+
+ /**
+ * Helper method to centralize all checks as to whether or not the state is valid.
+ * @param state
+ * @return true when the state is valid
+ */
+ public static boolean isValidState(NextState state) {
+ return state != null;
+ }
+
+ /**
+ * @param state
+ * @return true when the state is non null and indicates that more values exist
+ */
+ public static boolean hasMoreValues(NextState state) {
+ return state != null && state.hasMoreValues();
+ }
+ }
+
+ /**
* Grab the next row's worth of values.
* @param results return output array
- * @return true if more rows exist after this one, false if scanner is done
+ * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
+ * one, false if scanner is done
* @throws IOException e
*/
- boolean next(List<Cell> results) throws IOException;
+ NextState next(List<Cell> results) throws IOException;
/**
- * Grab the next row's worth of values.
+ * Grab the next row's worth of values with a limit on the number of values to return.
+ * @param result return output array
+ * @param limit limit on row count to get
+ * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
+ * one, false if scanner is done
+ * @throws IOException e
+ */
+ NextState next(List<Cell> result, int limit) throws IOException;
+
+ /**
+ * Grab the next row's worth of values with a limit on the number of values to return as well as a
+ * restriction on the size of the list of values that are returned.
* @param result return output array
- * @param scannerContext
- * @return true if more rows exist after this one, false if scanner is done
+ * @param limit limit on row count to get
+ * @param remainingResultSize limit on the size of the result being returned
+ * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
+ * one, false if scanner is done
* @throws IOException e
*/
- boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException;
+ NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException;
/**
* Closes the scanner and releases any resources it has allocated
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 6e7788a..beb23cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -27,7 +27,6 @@ import java.util.PriorityQueue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
/**
* Implements a heap merge across any number of KeyValueScanners.
@@ -129,20 +128,26 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
* This can ONLY be called when you are using Scanners that implement InternalScanner as well as
* KeyValueScanner (a {@link StoreScanner}).
* @param result
- * @return true if more rows exist after this one, false if scanner is done
+ * @param limit
+ * @return state where NextState#hasMoreValues() is true if more keys exist after this
+ * one, false if scanner is done
*/
- @Override
- public boolean next(List<Cell> result) throws IOException {
- return next(result, NoLimitScannerContext.NO_LIMIT);
+ public NextState next(List<Cell> result, int limit) throws IOException {
+ return next(result, limit, -1);
}
- @Override
- public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ public NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException {
if (this.current == null) {
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ return NextState.makeState(NextState.State.NO_MORE_VALUES);
}
InternalScanner currentAsInternal = (InternalScanner)this.current;
- boolean moreCells = currentAsInternal.next(result, scannerContext);
+ NextState state = currentAsInternal.next(result, limit, remainingResultSize);
+ // Invalid states should never be returned. Receiving an invalid state means that we have
+ // no clue how to proceed. Throw an exception.
+ if (!NextState.isValidState(state)) {
+ throw new IOException("Invalid state returned from InternalScanner#next");
+ }
+ boolean mayContainMoreRows = NextState.hasMoreValues(state);
Cell pee = this.current.peek();
/*
* By definition, any InternalScanner must return false only when it has no
@@ -151,16 +156,31 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
* more efficient to close scanners which are not needed than keep them in
* the heap. This is also required for certain optimizations.
*/
- if (pee == null || !moreCells) {
+ if (pee == null || !mayContainMoreRows) {
this.current.close();
} else {
this.heap.add(this.current);
}
this.current = pollRealKV();
if (this.current == null) {
- moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize());
}
- return moreCells;
+ return state;
+ }
+
+ /**
+ * Gets the next row of keys from the top-most scanner.
+ * <p>
+ * This method takes care of updating the heap.
+ * <p>
+ * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
+ * KeyValueScanner (a {@link StoreScanner}).
+ * @param result
+ * @return state where NextState#hasMoreValues() is true if more keys exist after this
+ * one, false if scanner is done
+ */
+ public NextState next(List<Cell> result) throws IOException {
+ return next(result, -1);
}
protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
deleted file mode 100644
index 99a371a..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * This is a special {@link ScannerContext} subclass that is designed to be used globally when
- * limits should not be enforced during invocations of {@link InternalScanner#next(java.util.List)}
- * or {@link RegionScanner#next(java.util.List)}.
- * <p>
- * Instances of {@link NoLimitScannerContext} are immutable after construction. Any attempt to
- * change the limits or progress of a {@link NoLimitScannerContext} will fail silently. The net
- * effect is that all limit checks will return false, thus indicating that a limit has not been
- * reached.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@InterfaceStability.Evolving
-public class NoLimitScannerContext extends ScannerContext {
-
- public NoLimitScannerContext() {
- super(false, null);
- }
-
- /**
- * Use this instance whenever limits do not need to be enforced.
- */
- public static ScannerContext NO_LIMIT = new NoLimitScannerContext();
-
- @Override
- void setKeepProgress(boolean keepProgress) {
- // Do nothing. NoLimitScannerContext instances are immutable post-construction
- }
-
- @Override
- void setBatchProgress(int batchProgress) {
- // Do nothing. NoLimitScannerContext instances are immutable post-construction
- }
-
- @Override
- void setSizeProgress(long sizeProgress) {
- // Do nothing. NoLimitScannerContext instances are immutable post-construction
- }
-
- @Override
- void setProgress(int batchProgress, long sizeProgress) {
- // Do nothing. NoLimitScannerContext instances are immutable post-construction
- }
-
- @Override
- void setSizeLimitScope(LimitScope scope) {
- // Do nothing. NoLimitScannerContext instances are immutable post-construction
- }
-
- @Override
- NextState setScannerState(NextState state) {
- // Do nothing. NoLimitScannerContext instances are immutable post-construction
- return state;
- }
-
- @Override
- boolean checkBatchLimit(LimitScope checkerScope) {
- // No limits can be specified, thus return false to indicate no limit has been reached.
- return false;
- }
-
- @Override
- boolean checkSizeLimit(LimitScope checkerScope) {
- // No limits can be specified, thus return false to indicate no limit has been reached.
- return false;
- }
-
- @Override
- boolean checkAnyLimitReached(LimitScope checkerScope) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 10e39a1..1508a15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -105,6 +105,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
@@ -118,8 +120,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfiguratio
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
@@ -151,10 +151,10 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
-import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -2236,53 +2236,61 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// correct ordering of partial results and so we prevent partial results from being
// formed.
boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
- boolean allowPartialResults =
+ boolean enforceMaxResultSizeAtCellLevel =
clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
- boolean moreRows = false;
-
- final LimitScope sizeScope =
- allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
-
- // Configure with limits for this RPC. Set keep progress true since size progress
- // towards size limit should be kept between calls to nextRaw
- ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
- contextBuilder.setSizeLimit(sizeScope, maxResultSize);
- contextBuilder.setBatchLimit(scanner.getBatch());
- ScannerContext scannerContext = contextBuilder.build();
+ NextState state = null;
while (i < rows) {
// Stop collecting results if we have exceeded maxResultSize
- if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) {
+ if (currentScanResultSize >= maxResultSize) {
builder.setMoreResultsInRegion(true);
break;
}
- // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
- // batch limit is a limit on the number of cells per Result. Thus, if progress is
- // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
- // reset the batch progress between nextRaw invocations since we don't want the
- // batch progress from previous calls to affect future calls
- scannerContext.setBatchProgress(0);
+ // A negative remainingResultSize communicates that there is no limit on the size
+ // of the results.
+ final long remainingResultSize =
+ enforceMaxResultSizeAtCellLevel ? maxResultSize - currentScanResultSize
+ : -1;
// Collect values to be returned here
- moreRows = scanner.nextRaw(values, scannerContext);
-
+ state = scanner.nextRaw(values, scanner.getBatch(), remainingResultSize);
+ // Invalid states should never be returned. If one is seen, throw exception
+ // to stop the scan -- We have no way of telling how we should proceed
+ if (!NextState.isValidState(state)) {
+ throw new IOException("NextState returned from call to nextRaw was invalid");
+ }
if (!values.isEmpty()) {
+ // The state should always contain an estimate of the result size because that
+ // estimate must be used to decide when partial results are formed.
+ boolean skipResultSizeCalculation = state.hasResultSizeEstimate();
+ if (skipResultSizeCalculation) currentScanResultSize += state.getResultSize();
+
for (Cell cell : values) {
totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
+
+ // If the calculation can't be skipped, then do it now.
+ if (!skipResultSizeCalculation) {
+ currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
+ }
}
- final boolean partial = scannerContext.partialResultFormed();
+ // The size limit was reached. This means there are more cells remaining in
+ // the row but we had to stop because we exceeded our max result size. This
+ // indicates that we are returning a partial result
+ final boolean partial = state != null && state.sizeLimitReached();
results.add(Result.create(values, null, stale, partial));
i++;
}
- if (!moreRows) {
+ if (!NextState.hasMoreValues(state)) {
break;
}
values.clear();
}
-
- if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows ||
- moreRows) {
+ // currentScanResultSize >= maxResultSize should be functionally equivalent to
+ // state.sizeLimitReached()
+ if (null != state
+ && (currentScanResultSize >= maxResultSize || i >= rows || state
+ .hasMoreValues())) {
// We stopped prematurely
builder.setMoreResultsInRegion(true);
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
index 66e087b..26f9aef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
* RegionScanner describes iterators over rows in an HRegion.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@InterfaceStability.Evolving
+@InterfaceStability.Stable
public interface RegionScanner extends InternalScanner {
/**
* @return The RegionInfo for this scanner.
@@ -74,22 +74,35 @@ public interface RegionScanner extends InternalScanner {
int getBatch();
/**
- * Grab the next row's worth of values. This is a special internal method to be called from
- * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and
- * close a region operation, an synchronize on the scanner object. Caller should maintain and
- * update metrics. See {@link #nextRaw(List, ScannerContext)}
+ * Grab the next row's worth of values with the default limit on the number of values to return.
+ * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+ * Caller must set the thread's readpoint, start and close a region operation, an synchronize on
+ * the scanner object. Caller should maintain and update metrics. See
+ * {@link #nextRaw(List, int, long)}
* @param result return output array
- * @return true if more rows exist after this one, false if scanner is done
+ * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
+ * scanner is done.
* @throws IOException e
*/
- boolean nextRaw(List<Cell> result) throws IOException;
-
+ NextState nextRaw(List<Cell> result) throws IOException;
+
+ /**
+ * Grab the next row's worth of values with the default limit on the number of values to return.
+ * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+ * Caller must set the thread's readpoint, start and close a region operation, an synchronize on
+ * the scanner object. Caller should maintain and update metrics. See
+ * {@link #nextRaw(List, int, long)}
+ * @param result return output array
+ * @param limit limit on row count to get
+ * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
+ * scanner is done.
+ * @throws IOException e
+ */
+ NextState nextRaw(List<Cell> result, int limit) throws IOException;
+
/**
- * Grab the next row's worth of values. The {@link ScannerContext} is used to enforce and track
- * any limits associated with this call. Any progress that exists in the {@link ScannerContext}
- * prior to calling this method will be LOST if {@link ScannerContext#getKeepProgress()} is false.
- * Upon returning from this method, the {@link ScannerContext} will contain information about the
- * progress made towards the limits. This is a special internal method to be called from
+ * Grab the next row's worth of values with a limit on the number of values to return as well as a
+ * limit on the heap size of those values. This is a special internal method to be called from
* coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and
* close a region operation, an synchronize on the scanner object. Example: <code><pre>
* HRegion region = ...;
@@ -107,12 +120,13 @@ public interface RegionScanner extends InternalScanner {
* }
* </pre></code>
* @param result return output array
- * @param scannerContext The {@link ScannerContext} instance encapsulating all limits that should
- * be tracked during calls to this method. The progress towards these limits can be
- * tracked within this instance.
- * @return true if more rows exist after this one, false if scanner is done
+ * @param limit limit on row count to get
+ * @param remainingResultSize the space remaining within the restriction on the result size.
+ * Negative values indicate no limit
+ * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
+ * scanner is done.
* @throws IOException e
*/
- boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
+ NextState nextRaw(List<Cell> result, int limit, final long remainingResultSize)
throws IOException;
}
|