Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0081E17A8C for ; Fri, 17 Apr 2015 22:44:10 +0000 (UTC) Received: (qmail 87894 invoked by uid 500); 17 Apr 2015 22:44:09 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 87699 invoked by uid 500); 17 Apr 2015 22:44:09 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 87683 invoked by uid 99); 17 Apr 2015 22:44:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Apr 2015 22:44:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E51EE0013; Fri, 17 Apr 2015 22:44:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Fri, 17 Apr 2015 22:44:10 -0000 Message-Id: <4ace49894a91471fb789b23caff86ef5@git.apache.org> In-Reply-To: <5c3a82aa968146398b210c848ae147df@git.apache.org> References: <5c3a82aa968146398b210c848ae147df@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hbase git commit: HBASE-13090 Progress heartbeats for long running scanners HBASE-13090 Progress heartbeats for long running scanners Signed-off-by: stack Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/43f24db8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/43f24db8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/43f24db8 Branch: refs/heads/branch-1.1 Commit: 43f24db82566818d02062466ac421d86ddb735d8 Parents: 5ef3645 Author: Jonathan Lawlor Authored: Tue Mar 10 14:24:07 2015 -0700 Committer: stack Committed: Fri Apr 17 15:43:59 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/ClientScanner.java | 49 +- .../hadoop/hbase/client/ScannerCallable.java | 23 + .../client/ScannerCallableWithReplicas.java | 10 + .../hadoop/hbase/protobuf/RequestConverter.java | 3 + .../src/main/resources/hbase-default.xml | 12 + .../hbase/protobuf/generated/ClientProtos.java | 349 ++++++++++-- hbase-protocol/src/main/protobuf/Client.proto | 7 + .../hbase/client/ClientSideRegionScanner.java | 4 +- .../hadoop/hbase/regionserver/HRegion.java | 59 +- .../hadoop/hbase/regionserver/KeyValueHeap.java | 2 + .../regionserver/NoLimitScannerContext.java | 28 +- .../hbase/regionserver/RSRpcServices.java | 94 +++- .../hbase/regionserver/ScannerContext.java | 124 ++++- .../hadoop/hbase/regionserver/StoreScanner.java | 31 ++ .../hbase/TestPartialResultsFromClientSide.java | 3 +- .../coprocessor/TestCoprocessorInterface.java | 1 - .../TestScannerHeartbeatMessages.java | 538 +++++++++++++++++++ .../compactions/TestStripeCompactionPolicy.java | 1 - 18 files changed, 1234 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 0c28e05..0ee29f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -394,7 +394,6 @@ public class ClientScanner extends AbstractClientScanner { // returns an empty array if scanning is to go on and we've just // exhausted current region. values = call(callable, caller, scannerTimeout); - // When the replica switch happens, we need to do certain operations // again. The callable will openScanner with the right startkey // but we need to pick up from there. Bypass the rest of the loop @@ -483,7 +482,8 @@ public class ClientScanner extends AbstractClientScanner { // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. - List resultsToAddToCache = getResultsToAddToCache(values); + List resultsToAddToCache = + getResultsToAddToCache(values, callable.isHeartbeatMessage()); if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { cache.add(rs); @@ -495,6 +495,19 @@ public class ClientScanner extends AbstractClientScanner { this.lastResult = rs; } } + + // Caller of this method just wants a Result. If we see a heartbeat message, it means + // processing of the scan is taking a long time server side. Rather than continue to + // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing + // unnecesary delays to the caller + if (callable.isHeartbeatMessage() && cache.size() > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Heartbeat message received and cache contains Results." + + " Breaking out of scan loop"); + } + break; + } + // We expect that the server won't have more results for us when we exhaust // the size (bytes or count) of the results returned. If the server *does* inform us that // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually @@ -508,20 +521,38 @@ public class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults + } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); } /** + * @param remainingResultSize + * @param remainingRows + * @param regionHasMoreResults + * @return true when the current region has been exhausted. When the current region has been + * exhausted, the region must be changed before scanning can continue + */ + private boolean doneWithRegion(long remainingResultSize, int remainingRows, + boolean regionHasMoreResults) { + return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults; + } + + /** * This method ensures all of our book keeping regarding partial results is kept up to date. This * method should be called once we know that the results we received back from the RPC request do * not contain errors. We return a list of results that should be added to the cache. In general, * this list will contain all NON-partial results from the input array (unless the client has * specified that they are okay with receiving partial results) + * @param resultsFromServer The array of {@link Result}s returned from the server + * @param heartbeatMessage Flag indicating whether or not the response received from the server + * represented a complete response, or a heartbeat message that was sent to keep the + * client-server connection alive * @return the list of results that should be added to the cache. * @throws IOException */ - protected List getResultsToAddToCache(Result[] resultsFromServer) throws IOException { + protected List + getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage) + throws IOException { int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; List resultsToAddToCache = new ArrayList(resultSize); @@ -539,10 +570,14 @@ public class ClientScanner extends AbstractClientScanner { return resultsToAddToCache; } - // If no results were returned it indicates that we have the all the partial results necessary - // to construct the complete result. + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive if (resultsFromServer == null || resultsFromServer.length == 0) { - if (!partialResults.isEmpty()) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partialResults.isEmpty() && !heartbeatMessage) { resultsToAddToCache.add(Result.createCompleteResult(partialResults)); clearPartialResults(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index ce61ef6..1994772 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -79,6 +79,12 @@ public class ScannerCallable extends RegionServerCallable { protected final int id; protected boolean serverHasMoreResultsContext; protected boolean serverHasMoreResults; + + /** + * Saves whether or not the most recent response from the server was a heartbeat message. + * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} + */ + protected boolean heartbeatMessage = false; static { try { myAddress = DNS.getDefaultHost("default", "default"); @@ -194,6 +200,8 @@ public class ScannerCallable extends RegionServerCallable { } else { Result [] rrs = null; ScanRequest request = null; + // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server + setHeartbeatMessage(false); try { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); @@ -214,6 +222,7 @@ public class ScannerCallable extends RegionServerCallable { // See HBASE-5974 nextCallSeq++; long timestamp = System.currentTimeMillis(); + setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response); @@ -294,6 +303,20 @@ public class ScannerCallable extends RegionServerCallable { return null; } + /** + * @return true when the most recent RPC response indicated that the response was a heartbeat + * message. Heartbeat messages are sent back from the server when the processing of the + * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid + * timeouts during long running scan operations. + */ + protected boolean isHeartbeatMessage() { + return heartbeatMessage; + } + + protected void setHeartbeatMessage(boolean heartbeatMessage) { + this.heartbeatMessage = heartbeatMessage; + } + private void incRPCcallsMetrics() { if (this.scanMetrics == null) { return; http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index e32a2d2..c0335f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -273,6 +273,16 @@ class ScannerCallableWithReplicas implements RetryingCallable { return replicaSwitched.get(); } + /** + * @return true when the most recent RPC response indicated that the response was a heartbeat + * message. Heartbeat messages are sent back from the server when the processing of the + * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid + * timeouts during long running scan operations. + */ + public boolean isHeartbeatMessage() { + return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); + } + private int addCallsForCurrentReplica( ResultBoundedCompletionService> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 62262e0..26314d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -491,6 +491,7 @@ public final class RequestConverter { builder.setRegion(region); builder.setScan(ProtobufUtil.toScan(scan)); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } @@ -509,6 +510,7 @@ public final class RequestConverter { builder.setCloseScanner(closeScanner); builder.setScannerId(scannerId); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } @@ -529,6 +531,7 @@ public final class RequestConverter { builder.setScannerId(scannerId); builder.setNextCallSeq(nextCallSeq); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 2a98d7d..86a5104 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -801,6 +801,18 @@ possible configurations would overwhelm and obscure the important. but will eventually throw a TimeoutException. + hbase.cells.scanned.per.heartbeat.check + 10000 + The number of cells scanned in between heartbeat checks. Heartbeat + checks occur during the processing of scans to determine whether or not the + server should stop scanning in order to send back a heartbeat message to the + client. Heartbeat messages are used to keep the client-server connection alive + during long running scans. Small values mean that the heartbeat checks will + occur more often and thus will provide a tighter bound on the execution time of + the scan. Larger values mean that the heartbeat checks occur less frequently + + + hbase.rpc.shortoperation.timeout 10000 This is another version of "hbase.rpc.timeout". For those RPC operation http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 60ab651..2991ece 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -16433,6 +16433,16 @@ public final class ClientProtos { * optional bool client_handles_partials = 7; */ boolean getClientHandlesPartials(); + + // optional bool client_handles_heartbeats = 8; + /** + * optional bool client_handles_heartbeats = 8; + */ + boolean hasClientHandlesHeartbeats(); + /** + * optional bool client_handles_heartbeats = 8; + */ + boolean getClientHandlesHeartbeats(); } /** * Protobuf type {@code ScanRequest} @@ -16549,6 +16559,11 @@ public final class ClientProtos { clientHandlesPartials_ = input.readBool(); break; } + case 64: { + bitField0_ |= 0x00000080; + clientHandlesHeartbeats_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -16713,6 +16728,22 @@ public final class ClientProtos { return clientHandlesPartials_; } + // optional bool client_handles_heartbeats = 8; + public static final int CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER = 8; + private boolean clientHandlesHeartbeats_; + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean hasClientHandlesHeartbeats() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean getClientHandlesHeartbeats() { + return clientHandlesHeartbeats_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -16721,6 +16752,7 @@ public final class ClientProtos { closeScanner_ = false; nextCallSeq_ = 0L; clientHandlesPartials_ = false; + clientHandlesHeartbeats_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -16767,6 +16799,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeBool(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(8, clientHandlesHeartbeats_); + } getUnknownFields().writeTo(output); } @@ -16804,6 +16839,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, clientHandlesHeartbeats_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -16862,6 +16901,11 @@ public final class ClientProtos { result = result && (getClientHandlesPartials() == other.getClientHandlesPartials()); } + result = result && (hasClientHandlesHeartbeats() == other.hasClientHandlesHeartbeats()); + if (hasClientHandlesHeartbeats()) { + result = result && (getClientHandlesHeartbeats() + == other.getClientHandlesHeartbeats()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -16903,6 +16947,10 @@ public final class ClientProtos { hash = (37 * hash) + CLIENT_HANDLES_PARTIALS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getClientHandlesPartials()); } + if (hasClientHandlesHeartbeats()) { + hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17049,6 +17097,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000020); clientHandlesPartials_ = false; bitField0_ = (bitField0_ & ~0x00000040); + clientHandlesHeartbeats_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -17113,6 +17163,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000040; } result.clientHandlesPartials_ = clientHandlesPartials_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -17150,6 +17204,9 @@ public final class ClientProtos { if (other.hasClientHandlesPartials()) { setClientHandlesPartials(other.getClientHandlesPartials()); } + if (other.hasClientHandlesHeartbeats()) { + setClientHandlesHeartbeats(other.getClientHandlesHeartbeats()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -17588,6 +17645,39 @@ public final class ClientProtos { return this; } + // optional bool client_handles_heartbeats = 8; + private boolean clientHandlesHeartbeats_ ; + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean hasClientHandlesHeartbeats() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean getClientHandlesHeartbeats() { + return clientHandlesHeartbeats_; + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public Builder setClientHandlesHeartbeats(boolean value) { + bitField0_ |= 0x00000080; + clientHandlesHeartbeats_ = value; + onChanged(); + return this; + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public Builder clearClientHandlesHeartbeats() { + bitField0_ = (bitField0_ & ~0x00000080); + clientHandlesHeartbeats_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -17806,6 +17896,30 @@ public final class ClientProtos { * */ boolean getMoreResultsInRegion(); + + // optional bool heartbeat_message = 9; + /** + * optional bool heartbeat_message = 9; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + boolean hasHeartbeatMessage(); + /** + * optional bool heartbeat_message = 9; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + boolean getHeartbeatMessage(); } /** * Protobuf type {@code ScanResponse} @@ -17939,6 +18053,11 @@ public final class ClientProtos { moreResultsInRegion_ = input.readBool(); break; } + case 72: { + bitField0_ |= 0x00000020; + heartbeatMessage_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -18252,6 +18371,36 @@ public final class ClientProtos { return moreResultsInRegion_; } + // optional bool heartbeat_message = 9; + public static final int HEARTBEAT_MESSAGE_FIELD_NUMBER = 9; + private boolean heartbeatMessage_; + /** + * optional bool heartbeat_message = 9; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + public boolean hasHeartbeatMessage() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional bool heartbeat_message = 9; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + public boolean getHeartbeatMessage() { + return heartbeatMessage_; + } + private void initFields() { cellsPerResult_ = java.util.Collections.emptyList(); scannerId_ = 0L; @@ -18261,6 +18410,7 @@ public final class ClientProtos { stale_ = false; partialFlagPerResult_ = java.util.Collections.emptyList(); moreResultsInRegion_ = false; + heartbeatMessage_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -18298,6 +18448,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBool(8, moreResultsInRegion_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBool(9, heartbeatMessage_); + } getUnknownFields().writeTo(output); } @@ -18346,6 +18499,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(8, moreResultsInRegion_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(9, heartbeatMessage_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -18400,6 +18557,11 @@ public final class ClientProtos { result = result && (getMoreResultsInRegion() == other.getMoreResultsInRegion()); } + result = result && (hasHeartbeatMessage() == other.hasHeartbeatMessage()); + if (hasHeartbeatMessage()) { + result = result && (getHeartbeatMessage() + == other.getHeartbeatMessage()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -18445,6 +18607,10 @@ public final class ClientProtos { hash = (37 * hash) + MORE_RESULTS_IN_REGION_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getMoreResultsInRegion()); } + if (hasHeartbeatMessage()) { + hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getHeartbeatMessage()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -18581,6 +18747,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000040); moreResultsInRegion_ = false; bitField0_ = (bitField0_ & ~0x00000080); + heartbeatMessage_ = false; + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -18648,6 +18816,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000010; } result.moreResultsInRegion_ = moreResultsInRegion_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000020; + } + result.heartbeatMessage_ = heartbeatMessage_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18725,6 +18897,9 @@ public final class ClientProtos { if (other.hasMoreResultsInRegion()) { setMoreResultsInRegion(other.getMoreResultsInRegion()); } + if (other.hasHeartbeatMessage()) { + setHeartbeatMessage(other.getHeartbeatMessage()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -19561,6 +19736,67 @@ public final class ClientProtos { return this; } + // optional bool heartbeat_message = 9; + private boolean heartbeatMessage_ ; + /** + * optional bool heartbeat_message = 9; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public boolean hasHeartbeatMessage() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional bool heartbeat_message = 9; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public boolean getHeartbeatMessage() { + return heartbeatMessage_; + } + /** + * optional bool heartbeat_message = 9; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public Builder setHeartbeatMessage(boolean value) { + bitField0_ |= 0x00000100; + heartbeatMessage_ = value; + onChanged(); + return this; + } + /** + * optional bool heartbeat_message = 9; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public Builder clearHeartbeatMessage() { + bitField0_ = (bitField0_ & ~0x00000100); + heartbeatMessage_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanResponse) } @@ -32690,63 +32926,64 @@ public final class ClientProtos { "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" + "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" + "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" + - "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", + "\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" + "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" + "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" + - "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\311\001\n\014S" + - "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" + - "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" + - "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" + - "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" + - "\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\"\263" + - "\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132", - "\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " + - ".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" + - "gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" + - "\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" + - "ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" + - "iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002" + - "(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" + - "\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001 " + - "\001(\0132\016.NameBytesPair\"d\n\031CoprocessorServic" + - "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi", - "er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" + - "l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" + - "2\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(" + - "\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003g" + - "et\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.C" + - "oprocessorServiceCall\"Y\n\014RegionAction\022 \n" + - "\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" + - "c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Regi" + - "onLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\r", - "heapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcep" + - "tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" + - "sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" + - "1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer" + - "viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" + - "adStats\"f\n\022RegionActionResult\022-\n\021resultO" + - "rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" + - "exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" + - "Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" + - "tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 ", - "\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" + - "onActionResult\030\001 \003(\0132\023.RegionActionResul" + - "t\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006S" + - "TRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022" + - " \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mut" + - "ate\022\016.MutateRequest\032\017.MutateResponse\022#\n\004" + - "Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBul" + - "kLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bulk" + - "LoadHFileResponse\022F\n\013ExecService\022\032.Copro" + - "cessorServiceRequest\032\033.CoprocessorServic", - "eResponse\022R\n\027ExecRegionServerService\022\032.C" + - "oprocessorServiceRequest\032\033.CoprocessorSe" + - "rviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016." + - "MultiResponseBB\n*org.apache.hadoop.hbase" + - ".protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001" + - "\001" + "\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" + + "ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" + + "sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" + + "ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" + + "l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" + + "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" + + "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea", + "rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" + + "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + + "5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" + + "est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" + + "\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" + + "(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " + + "\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" + + "(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" + + "\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" + + "viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai", + "r\"d\n\031CoprocessorServiceRequest\022 \n\006region" + + "\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" + + ".CoprocessorServiceCall\"]\n\032CoprocessorSe" + + "rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" + + "ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" + + "\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" + + "\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" + + "service_call\030\004 \001(\0132\027.CoprocessorServiceC" + + "all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" + + "gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030", + "\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" + + "storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" + + "\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" + + "\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" + + "\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" + + "\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" + + "Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" + + "ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" + + ".ResultOrException\022!\n\texception\030\002 \001(\0132\016." + + "NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA", + "ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" + + "\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" + + "\rMultiResponse\022/\n\022regionActionResult\030\001 \003" + + "(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" + + "(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" + + "E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" + + "t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" + + "t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" + + "\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" + + "oadHFileRequest\032\026.BulkLoadHFileResponse\022", + "F\n\013ExecService\022\032.CoprocessorServiceReque" + + "st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" + + "egionServerService\022\032.CoprocessorServiceR" + + "equest\032\033.CoprocessorServiceResponse\022&\n\005M" + + "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" + + "rg.apache.hadoop.hbase.protobuf.generate" + + "dB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -32842,13 +33079,13 @@ public final class ClientProtos { internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", }); internal_static_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_ScanResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", }); internal_static_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_BulkLoadHFileRequest_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-protocol/src/main/protobuf/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index e0c370b..3a48cc8 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -275,6 +275,7 @@ message ScanRequest { optional bool close_scanner = 5; optional uint64 next_call_seq = 6; optional bool client_handles_partials = 7; + optional bool client_handles_heartbeats = 8; } /** @@ -313,6 +314,12 @@ message ScanResponse { // reasons such as the size in bytes or quantity of results accumulated. This field // will true when more results exist in the current region. optional bool more_results_in_region = 8; + + // This field is filled in if the server is sending back a heartbeat message. + // Heartbeat messages are sent back to the client to prevent the scanner from + // timing out. Seeing a heartbeat message communicates to the Client that the + // server would have continued to scan had the time limit not been reached. + optional bool heartbeat_message = 9; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index 5809983..9d7bcc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.mortbay.log.Log; @@ -72,8 +71,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner { @Override public Result next() throws IOException { values.clear(); - - scanner.nextRaw(values, NoLimitScannerContext.getInstance()); + scanner.nextRaw(values); if (values.isEmpty()) { //we are done return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8103445..2afe4ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5288,8 +5288,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public synchronized boolean next(List outResults, ScannerContext scannerContext) - throws IOException { + public synchronized boolean next(List outResults, ScannerContext scannerContext) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -5327,7 +5326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi moreValues = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } - + // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows @@ -5395,6 +5394,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ScannerContext.NextState state = moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); } } while (moreCellsInRow); @@ -5443,6 +5446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // progress. int initialBatchProgress = scannerContext.getBatchProgress(); long initialSizeProgress = scannerContext.getSizeProgress(); + long initialTimeProgress = scannerContext.getTimeProgress(); // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. @@ -5454,7 +5458,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // progress should be kept. if (scannerContext.getKeepProgress()) { // Progress should be kept. Reset to initial values seen at start of method invocation. - scannerContext.setProgress(initialBatchProgress, initialSizeProgress); + scannerContext + .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -5502,6 +5507,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + " formed. Changing scope of limits that may create partials"); } scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); + scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); } // Check if we were getting data from the joinedHeap and hit the limit. @@ -5537,6 +5543,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } return true; } + Cell nextKv = this.storeHeap.peek(); stopRow = nextKv == null || isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength()); @@ -5550,12 +5557,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ret = filter.filterRowCellsWithRet(results); // We don't know how the results have changed after being filtered. Must set progress - // according to contents of results now. + // according to contents of results now. However, a change in the results should not + // affect the time progress. Thus preserve whatever time progress has been made + long timeProgress = scannerContext.getTimeProgress(); if (scannerContext.getKeepProgress()) { - scannerContext.setProgress(initialBatchProgress, initialSizeProgress); + scannerContext.setProgress(initialBatchProgress, initialSizeProgress, + initialTimeProgress); } else { scannerContext.clearProgress(); } + scannerContext.setTimeProgress(timeProgress); scannerContext.incrementBatchProgress(results.size()); for (Cell cell : results) { scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell)); @@ -5580,14 +5591,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // These values are not needed for filter to work, so we postpone their // fetch to (possibly) reduce amount of data loads from disk. if (this.joinedHeap != null) { - Cell nextJoinedKv = joinedHeap.peek(); - // If joinedHeap is pointing to some other row, try to seek to a correct one. - boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, - currentRow, offset, length)) - || (this.joinedHeap.requestSeek( - KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true) - && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(), - currentRow, offset, length)); + boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length); if (mayHaveData) { joinedContinuationRow = current; populateFromJoinedHeap(results, scannerContext); @@ -5631,6 +5635,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** + * @param currentRow + * @param offset + * @param length + * @return true when the joined heap may have data for the current row + * @throws IOException + */ + private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length) + throws IOException { + Cell nextJoinedKv = joinedHeap.peek(); + boolean matchCurrentRow = + nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length); + boolean matchAfterSeek = false; + + // If the next value in the joined heap does not match the current row, try to seek to the + // correct row + if (!matchCurrentRow) { + Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length); + boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); + matchAfterSeek = + seekSuccessful && joinedHeap.peek() != null + && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length); + } + + return matchCurrentRow || matchAfterSeek; + } + + /** * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines * both filterRow & filterRow(List kvs) functions. While 0.94 code or older, it may * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 761267f..6433453 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -144,6 +144,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner InternalScanner currentAsInternal = (InternalScanner)this.current; boolean moreCells = currentAsInternal.next(result, scannerContext); Cell pee = this.current.peek(); + /* * By definition, any InternalScanner must return false only when it has no * further rows to be fetched. So, we can close a scanner if it returns @@ -151,6 +152,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ + if (pee == null || !moreCells) { this.current.close(); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java index 1484e80..66ed6c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -68,7 +68,22 @@ public class NoLimitScannerContext extends ScannerContext { } @Override - void setProgress(int batchProgress, long sizeProgress) { + void setTimeProgress(long timeProgress) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void updateTimeProgress() { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void setProgress(int batchProgress, long sizeProgress, long timeProgress) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void clearProgress() { // Do nothing. NoLimitScannerContext instances are immutable post-construction } @@ -78,6 +93,11 @@ public class NoLimitScannerContext extends ScannerContext { } @Override + void setTimeLimitScope(LimitScope scope) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override NextState setScannerState(NextState state) { // Do nothing. NoLimitScannerContext instances are immutable post-construction return state; @@ -96,6 +116,12 @@ public class NoLimitScannerContext extends ScannerContext { } @Override + boolean checkTimeLimit(LimitScope checkerScope) { + // No limits can be specified, thus return false to indicate no limit has been reached. + return false; + } + + @Override boolean checkAnyLimitReached(LimitScope checkerScope) { return false; } http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0ef1d99..a10e0c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -194,6 +194,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = "hbase.region.server.rpc.scheduler.factory.class"; + /** + * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This + * configuration exists to prevent the scenario where a time limit is specified to be so + * restrictive that the time limit is reached immediately (before any cells are scanned). + */ + private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = + "hbase.region.server.rpc.minimum.scan.time.limit.delta"; + /** + * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} + */ + private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; + // Request counter. (Includes requests that are not serviced by regions.) final Counter requestCount = new Counter(); // Server to handle client requests. @@ -216,6 +228,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final int scannerLeaseTimeoutPeriod; /** + * The RPC timeout period (milliseconds) + */ + private final int rpcTimeout; + + /** + * The minimum allowable delta to use for the scan limit + */ + private final long minimumScanTimeLimitDelta; + + /** * Holder class which holds the RegionScanner and nextCallSeq together. */ private static class RegionScannerHolder { @@ -831,6 +853,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, maxScannerResultSize = rs.conf.getLong( HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); + rpcTimeout = rs.conf.getInt( + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + minimumScanTimeLimitDelta = rs.conf.getLong( + REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); // Set our address, however we need the final port that was given to rpcServer isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort()); @@ -2264,6 +2292,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean stale = (region.getRegionInfo().getReplicaId() != 0); boolean clientHandlesPartials = request.hasClientHandlesPartials() && request.getClientHandlesPartials(); + boolean clientHandlesHeartbeats = + request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); // On the server side we must ensure that the correct ordering of partial results is // returned to the client to allow them to properly reconstruct the partial results. @@ -2275,23 +2305,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler, clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; boolean moreRows = false; + // Heartbeat messages occur when the processing of the ScanRequest is exceeds a + // certain time threshold on the server. When the time threshold is exceeded, the + // server stops the scan and sends back whatever Results it has accumulated within + // that time period (may be empty). Since heartbeat messages have the potential to + // create partial Results (in the event that the timeout occurs in the middle of a + // row), we must only generate heartbeat messages when the client can handle both + // heartbeats AND partials + boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; + + // Default value of timeLimit is negative to indicate no timeLimit should be + // enforced. + long timeLimit = -1; + + // Set the time limit to be half of the more restrictive timeout value (one of the + // timeout values must be positive). In the event that both values are positive, the + // more restrictive of the two is used to calculate the limit. + if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { + long timeLimitDelta; + if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { + timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); + } else { + timeLimitDelta = + scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; + } + // Use half of whichever timeout value was more restrictive... But don't allow + // the time limit to be less than the allowable minimum (could cause an + // immediatate timeout before scanning any data). + timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); + timeLimit = System.currentTimeMillis() + timeLimitDelta; + } + final LimitScope sizeScope = allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + final LimitScope timeScope = + allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; // Configure with limits for this RPC. Set keep progress true since size progress // towards size limit should be kept between calls to nextRaw ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); contextBuilder.setSizeLimit(sizeScope, maxResultSize); contextBuilder.setBatchLimit(scanner.getBatch()); + contextBuilder.setTimeLimit(timeScope, timeLimit); ScannerContext scannerContext = contextBuilder.build(); + boolean limitReached = false; while (i < rows) { - // Stop collecting results if we have exceeded maxResultSize - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) { - builder.setMoreResultsInRegion(true); - break; - } - // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The // batch limit is a limit on the number of cells per Result. Thus, if progress is // being tracked (i.e. scannerContext.keepProgress() is true) then we need to @@ -2310,14 +2369,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, results.add(Result.create(values, null, stale, partial)); i++; } - if (!moreRows) { + + boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); + boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); + boolean rowLimitReached = i >= rows; + limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; + + if (limitReached || !moreRows) { + if (LOG.isTraceEnabled()) { + LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + + moreRows + " scannerContext: " + scannerContext); + } + // We only want to mark a ScanResponse as a heartbeat message in the event that + // there are more values to be read server side. If there aren't more values, + // marking it as a heartbeat is wasteful because the client will need to issue + // another ScanRequest only to realize that they already have all the values + if (moreRows) { + // Heartbeat messages occur when the time limit has been reached. + builder.setHeartbeatMessage(timeLimitReached); + } break; } values.clear(); } - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows || - moreRows) { + if (limitReached || moreRows) { // We stopped prematurely builder.setMoreResultsInRegion(true); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 6e487ca..7c8ff7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -101,7 +101,7 @@ public class ScannerContext { if (limitsToCopy != null) this.limits.copy(limitsToCopy); // Progress fields are initialized to 0 - progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0); + progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -137,6 +137,13 @@ public class ScannerContext { progress.setSize(currentSize + size); } + /** + * Update the time progress with {@link System#currentTimeMillis()} + */ + void updateTimeProgress() { + progress.setTime(System.currentTimeMillis()); + } + int getBatchProgress() { return progress.getBatch(); } @@ -145,9 +152,14 @@ public class ScannerContext { return progress.getSize(); } - void setProgress(int batchProgress, long sizeProgress) { + long getTimeProgress() { + return progress.getTime(); + } + + void setProgress(int batchProgress, long sizeProgress, long timeProgress) { setBatchProgress(batchProgress); setSizeProgress(sizeProgress); + setTimeProgress(timeProgress); } void setSizeProgress(long sizeProgress) { @@ -158,12 +170,16 @@ public class ScannerContext { progress.setBatch(batchProgress); } + void setTimeProgress(long timeProgress) { + progress.setTime(timeProgress); + } + /** * Clear away any progress that has been made so far. All progress fields are reset to initial * values */ void clearProgress() { - progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0); + progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0); } /** @@ -172,7 +188,7 @@ public class ScannerContext { * allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the * new state, thus preserving the immutability of {@link NoLimitScannerContext} * @param state - * @return The state that + * @return The state that was passed in. */ NextState setScannerState(NextState state) { if (!NextState.isValidState(state)) { @@ -188,7 +204,8 @@ public class ScannerContext { * reached in the middle of a row. */ boolean partialResultFormed() { - return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW; + return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW + || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW; } /** @@ -209,10 +226,18 @@ public class ScannerContext { /** * @param checkerScope + * @return true if the time limit can be enforced in the checker's scope + */ + boolean hasTimeLimit(LimitScope checkerScope) { + return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0; + } + + /** + * @param checkerScope * @return true if any limit can be enforced within the checker's scope */ boolean hasAnyLimit(LimitScope checkerScope) { - return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope); + return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope) || hasTimeLimit(checkerScope); } /** @@ -222,6 +247,13 @@ public class ScannerContext { limits.setSizeScope(scope); } + /** + * @param scope The scope in which the time limit will be enforced + */ + void setTimeLimitScope(LimitScope scope) { + limits.setTimeScope(scope); + } + int getBatchLimit() { return limits.getBatch(); } @@ -230,6 +262,10 @@ public class ScannerContext { return limits.getSize(); } + long getTimeLimit() { + return limits.getTime(); + } + /** * @param checkerScope The scope that the limit is being checked from * @return true when the limit is enforceable from the checker's scope and it has been reached @@ -247,11 +283,21 @@ public class ScannerContext { } /** + * @param checkerScope The scope that the limit is being checked from. The time limit is always + * checked against {@link System#currentTimeMillis()} + * @return true when the limit is enforceable from the checker's scope and it has been reached + */ + boolean checkTimeLimit(LimitScope checkerScope) { + return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime(); + } + + /** * @param checkerScope The scope that the limits are being checked from * @return true when some limit is enforceable from the checker's scope and it has been reached */ boolean checkAnyLimitReached(LimitScope checkerScope) { - return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope); + return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope) + || checkTimeLimit(checkerScope); } @Override @@ -305,6 +351,12 @@ public class ScannerContext { return this; } + public Builder setTimeLimit(LimitScope timeScope, long timeLimit) { + limits.setTime(timeLimit); + limits.setTimeScope(timeScope); + return this; + } + public Builder setBatchLimit(int batchLimit) { limits.setBatch(batchLimit); return this; @@ -328,6 +380,13 @@ public class ScannerContext { * of a row and thus a partial results was formed */ SIZE_LIMIT_REACHED_MID_ROW(true, true), + TIME_LIMIT_REACHED(true, true), + + /** + * Special case of time limit reached to indicate that the time limit was reached in the middle + * of a row and thus a partial results was formed + */ + TIME_LIMIT_REACHED_MID_ROW(true, true), BATCH_LIMIT_REACHED(true, true); private boolean moreValues; @@ -419,6 +478,7 @@ public class ScannerContext { */ private static int DEFAULT_BATCH = -1; private static long DEFAULT_SIZE = -1L; + private static long DEFAULT_TIME = -1L; /** * Default scope that is assigned to a limit if a scope is not specified. @@ -432,19 +492,23 @@ public class ScannerContext { LimitScope sizeScope = DEFAULT_SCOPE; long size = DEFAULT_SIZE; + LimitScope timeScope = DEFAULT_SCOPE; + long time = DEFAULT_TIME; + /** * Fields keep their default values. */ LimitFields() { } - LimitFields(int batch, LimitScope sizeScope, long size) { - setFields(batch, sizeScope, size); + LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) { + setFields(batch, sizeScope, size, timeScope, time); } void copy(LimitFields limitsToCopy) { if (limitsToCopy != null) { - setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize()); + setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(), + limitsToCopy.getTimeScope(), limitsToCopy.getTime()); } } @@ -454,10 +518,12 @@ public class ScannerContext { * @param sizeScope * @param size */ - void setFields(int batch, LimitScope sizeScope, long size) { + void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) { setBatch(batch); setSizeScope(sizeScope); setSize(size); + setTimeScope(timeScope); + setTime(time); } int getBatch() { @@ -506,6 +572,36 @@ public class ScannerContext { return this.sizeScope.canEnforceLimitFromScope(checkerScope); } + long getTime() { + return this.time; + } + + void setTime(long time) { + this.time = time; + } + + /** + * @return {@link LimitScope} indicating scope in which the time limit is enforced + */ + LimitScope getTimeScope() { + return this.timeScope; + } + + /** + * Change the scope in which the time limit is enforced + */ + void setTimeScope(LimitScope scope) { + this.timeScope = scope; + } + + /** + * @param checkerScope + * @return true when the limit can be enforced from the scope of the checker + */ + boolean canEnforceTimeLimitFromScope(LimitScope checkerScope) { + return this.sizeScope.canEnforceLimitFromScope(checkerScope); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -520,6 +616,12 @@ public class ScannerContext { sb.append(", sizeScope:"); sb.append(sizeScope); + sb.append(", time:"); + sb.append(time); + + sb.append(", timeScope:"); + sb.append(timeScope); + sb.append("}"); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 665ed46..f7e06ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -83,6 +83,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final long now; protected final int minVersions; protected final long maxRowSize; + protected final long cellsPerHeartbeatCheck; /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not @@ -100,6 +101,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; + /** + * The number of cells scanned in between timeout checks. Specifying a larger value means that + * timeout checks will occur less frequently. Specifying a small value will lead to more frequent + * timeout checks. + */ + public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = + "hbase.cells.scanned.per.heartbeat.check"; + + /** + * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. + */ + public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; + // if heap == null and lastTop != null, you need to reseek given the key below protected Cell lastTop = null; @@ -137,9 +151,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.maxRowSize = conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT); this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall()); + + long tmpCellsPerTimeoutCheck = + conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, + DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK); + this.cellsPerHeartbeatCheck = + tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck + : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; } else { this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT; this.scanUsePread = scan.isSmall(); + this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; } // We look up row-column Bloom filters for multi-column queries as part of @@ -458,6 +480,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean next(List outResult, ScannerContext scannerContext) throws IOException { lock.lock(); + try { if (scannerContext == null) { throw new IllegalArgumentException("Scanner context cannot be null"); @@ -507,6 +530,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner long totalBytesRead = 0; LOOP: while((cell = this.heap.peek()) != null) { + // Update and check the time limit based on the configured value of cellsPerTimeoutCheck + if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { + scannerContext.updateTimeProgress(); + if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { + return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); + } + } + if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevCell, cell, comparator); prevCell = cell; http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index eef955e..3794e59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -217,7 +217,8 @@ public class TestPartialResultsFromClientSide { count++; } - assertTrue(scanner2.next() == null); + r2 = scanner2.next(); + assertTrue("r2: " + r2 + " Should be null", r2 == null); scanner1.close(); scanner2.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/43f24db8/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 67eca80..b2b718c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -143,7 +143,6 @@ public class TestCoprocessorInterface { public int getBatch() { return delegate.getBatch(); } - } public static class CoprocessorImpl extends BaseRegionObserver {