Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 89B5D2009A8 for ; Tue, 17 May 2016 21:49:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8854C1609F5; Tue, 17 May 2016 19:49:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 323A11607A8 for ; Tue, 17 May 2016 21:49:41 +0200 (CEST) Received: (qmail 191 invoked by uid 500); 17 May 2016 19:49:40 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 181 invoked by uid 99); 17 May 2016 19:49:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 May 2016 19:49:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 417BCDFB90; Tue, 17 May 2016 19:49:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-15593 Time limit of scanning should be offered by client (Phil Yang) Date: Tue, 17 May 2016 19:49:40 +0000 (UTC) archived-at: Tue, 17 May 2016 19:49:42 -0000 Repository: hbase Updated Branches: refs/heads/master 9269b8199 -> e47bfb907 HBASE-15593 Time limit of scanning should be offered by client (Phil Yang) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e47bfb90 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e47bfb90 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e47bfb90 Branch: refs/heads/master Commit: e47bfb90786df6fa794af844028a4a0f50b3433c Parents: 9269b81 Author: stack Authored: Tue May 17 12:49:31 2016 -0700 Committer: stack Committed: Tue May 17 12:49:31 2016 -0700 ---------------------------------------------------------------------- .../hbase/protobuf/generated/RPCProtos.java | 106 +++++++++++++++++-- hbase-protocol/src/main/protobuf/RPC.proto | 1 + .../org/apache/hadoop/hbase/ipc/CallRunner.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 43 ++++++-- .../hadoop/hbase/ipc/RpcServerInterface.java | 5 + .../hbase/regionserver/RSRpcServices.java | 9 ++ .../TestScannerHeartbeatMessages.java | 19 ++-- 7 files changed, 158 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e47bfb90/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java index 2d4a430..d05eb57 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java @@ -3895,6 +3895,16 @@ public final class RPCProtos { * */ int getPriority(); + + // optional uint32 timeout = 7; + /** + * optional uint32 timeout = 7; + */ + boolean hasTimeout(); + /** + * optional uint32 timeout = 7; + */ + int getTimeout(); } /** * Protobuf type {@code hbase.pb.RequestHeader} @@ -3997,6 +4007,11 @@ public final class RPCProtos { priority_ = input.readUInt32(); break; } + case 56: { + bitField0_ |= 0x00000040; + timeout_ = input.readUInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4210,6 +4225,22 @@ public final class RPCProtos { return priority_; } + // optional uint32 timeout = 7; + public static final int TIMEOUT_FIELD_NUMBER = 7; + private int timeout_; + /** + * optional uint32 timeout = 7; + */ + public boolean hasTimeout() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional uint32 timeout = 7; + */ + public int getTimeout() { + return timeout_; + } + private void initFields() { callId_ = 0; traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance(); @@ -4217,6 +4248,7 @@ public final class RPCProtos { requestParam_ = false; cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance(); priority_ = 0; + timeout_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4248,6 +4280,9 @@ public final class RPCProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt32(6, priority_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeUInt32(7, timeout_); + } getUnknownFields().writeTo(output); } @@ -4281,6 +4316,10 @@ public final class RPCProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(6, priority_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(7, timeout_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4334,6 +4373,11 @@ public final class RPCProtos { result = result && (getPriority() == other.getPriority()); } + result = result && (hasTimeout() == other.hasTimeout()); + if (hasTimeout()) { + result = result && (getTimeout() + == other.getTimeout()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4371,6 +4415,10 @@ public final class RPCProtos { hash = (37 * hash) + PRIORITY_FIELD_NUMBER; hash = (53 * hash) + getPriority(); } + if (hasTimeout()) { + hash = (37 * hash) + TIMEOUT_FIELD_NUMBER; + hash = (53 * hash) + getTimeout(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4506,6 +4554,8 @@ public final class RPCProtos { bitField0_ = (bitField0_ & ~0x00000010); priority_ = 0; bitField0_ = (bitField0_ & ~0x00000020); + timeout_ = 0; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -4566,6 +4616,10 @@ public final class RPCProtos { to_bitField0_ |= 0x00000020; } result.priority_ = priority_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000040; + } + result.timeout_ = timeout_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4602,6 +4656,9 @@ public final class RPCProtos { if (other.hasPriority()) { setPriority(other.getPriority()); } + if (other.hasTimeout()) { + setTimeout(other.getTimeout()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -5124,6 +5181,39 @@ public final class RPCProtos { return this; } + // optional uint32 timeout = 7; + private int timeout_ ; + /** + * optional uint32 timeout = 7; + */ + public boolean hasTimeout() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional uint32 timeout = 7; + */ + public int getTimeout() { + return timeout_; + } + /** + * optional uint32 timeout = 7; + */ + public Builder setTimeout(int value) { + bitField0_ |= 0x00000040; + timeout_ = value; + onChanged(); + return this; + } + /** + * optional uint32 timeout = 7; + */ + public Builder clearTimeout() { + bitField0_ = (bitField0_ & ~0x00000040); + timeout_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.RequestHeader) } @@ -6141,17 +6231,17 @@ public final class RPCProtos { "llBlockMeta\022\016\n\006length\030\001 \001(\r\"|\n\021Exception" + "Response\022\034\n\024exception_class_name\030\001 \001(\t\022\023", "\n\013stack_trace\030\002 \001(\t\022\020\n\010hostname\030\003 \001(\t\022\014\n" + - "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\270\001\n\rRe" + + "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\311\001\n\rRe" + "questHeader\022\017\n\007call_id\030\001 \001(\r\022&\n\ntrace_in" + "fo\030\002 \001(\0132\022.hbase.pb.RPCTInfo\022\023\n\013method_n" + "ame\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\0220\n\017cell" + "_block_meta\030\005 \001(\0132\027.hbase.pb.CellBlockMe" + - "ta\022\020\n\010priority\030\006 \001(\r\"\203\001\n\016ResponseHeader\022" + - "\017\n\007call_id\030\001 \001(\r\022.\n\texception\030\002 \001(\0132\033.hb" + - "ase.pb.ExceptionResponse\0220\n\017cell_block_m" + - "eta\030\003 \001(\0132\027.hbase.pb.CellBlockMetaB<\n*or", - "g.apache.hadoop.hbase.protobuf.generated" + - "B\tRPCProtosH\001\240\001\001" + "ta\022\020\n\010priority\030\006 \001(\r\022\017\n\007timeout\030\007 \001(\r\"\203\001" + + "\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022.\n\texc" + + "eption\030\002 \001(\0132\033.hbase.pb.ExceptionRespons" + + "e\0220\n\017cell_block_meta\030\003 \001(\0132\027.hbase.pb.Ce", + "llBlockMetaB<\n*org.apache.hadoop.hbase.p" + + "rotobuf.generatedB\tRPCProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6187,7 +6277,7 @@ public final class RPCProtos { internal_static_hbase_pb_RequestHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_RequestHeader_descriptor, - new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", }); + new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", "Timeout", }); internal_static_hbase_pb_ResponseHeader_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_hbase_pb_ResponseHeader_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hbase/blob/e47bfb90/hbase-protocol/src/main/protobuf/RPC.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/RPC.proto b/hbase-protocol/src/main/protobuf/RPC.proto index 59bb03d..8413d25 100644 --- a/hbase-protocol/src/main/protobuf/RPC.proto +++ b/hbase-protocol/src/main/protobuf/RPC.proto @@ -125,6 +125,7 @@ message RequestHeader { // 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL. // See HConstants. optional uint32 priority = 6; + optional uint32 timeout = 7; } message ResponseHeader { http://git-wip-us.apache.org/repos/asf/hbase/blob/e47bfb90/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 3514245..00e08c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -114,7 +114,7 @@ public class CallRunner { } // make the call resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, - call.timestamp, this.status); + call.timestamp, this.status, call.timeout); } catch (Throwable e) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; http://git-wip-us.apache.org/repos/asf/hbase/blob/e47bfb90/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index b9a9b26..483ce86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -264,6 +264,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; + /** + * Minimum allowable timeout (in milliseconds) in rpc request's header. This + * configuration exists to prevent the rpc service regarding this request as timeout immediately. + */ + private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; + private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; + /** Default value for above params */ private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds @@ -274,6 +281,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private final int maxRequestSize; private final int warnResponseTime; private final int warnResponseSize; + + private final int minClientRequestTimeout; + private final Server server; private final List services; @@ -302,6 +312,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null + protected int timeout; /** * Chain of buffers to send as response. */ @@ -325,7 +336,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { justification="Can't figure why this complaint is happening... see below") Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, - long size, TraceInfo tinfo, final InetAddress remoteAddress) { + long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) { this.id = id; this.service = service; this.md = md; @@ -343,6 +354,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.remoteAddress = remoteAddress; this.retryImmediatelySupported = connection == null? null: connection.retryImmediatelySupported; + this.timeout = timeout; } /** @@ -1275,13 +1287,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Fake 'call' for failed authorization response private static final int AUTHORIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, - null, null, this, null, 0, null, null); + null, null, this, null, 0, null, null, 0); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null, - 0, null, null); + 0, null, null, 0); // was authentication allowed with a fallback to simple auth private boolean authenticatedWithFallback; @@ -1699,7 +1711,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null); + Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1874,7 +1886,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { final Call callTooBig = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null); + responder, totalRequestSize, null, null, 0); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, @@ -1923,7 +1935,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { final Call readParamsFailedCall = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null); + responder, totalRequestSize, null, null, 0); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage()); @@ -1934,8 +1946,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) : null; + int timeout = 0; + if (header.hasTimeout()){ + timeout = Math.max(minClientRequestTimeout, header.getTimeout()); + } Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, - totalRequestSize, traceInfo, this.addr); + totalRequestSize, traceInfo, this.addr, timeout); if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { callQueueSize.add(-1 * call.getSize()); @@ -2087,7 +2103,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); - + this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, + DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); // Start the listener here and let it bind to the port @@ -2228,6 +2245,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.secretManager = (SecretManager) secretManager; } + public Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) + throws IOException { + return call(service, md, param, cellScanner, receiveTime, status, 0); + } + /** * This is a server side method, which is invoked over RPC. On success * the return response has protobuf response payload. On failure, the @@ -2235,7 +2258,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { */ @Override public Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, + int timeout) throws IOException { try { status.setRPC(md.getName(), new Object[]{param}, receiveTime); @@ -2245,6 +2269,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { //get an instance of the method arg type long startTime = System.currentTimeMillis(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); + controller.setCallTimeout(timeout); Message result = service.callBlockingMethod(md, controller, param); long endTime = System.currentTimeMillis(); int processingTime = (int) (endTime - startTime); http://git-wip-us.apache.org/repos/asf/hbase/blob/e47bfb90/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index ab8b485..dd7e584 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -52,6 +52,11 @@ public interface RpcServerInterface { Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException, ServiceException; + Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, + int timeout) + throws IOException, ServiceException; + void setErrorHandler(HBaseRPCErrorHandler handler); HBaseRPCErrorHandler getErrorHandler(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e47bfb90/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 95ab36d..ebd85bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -2698,6 +2699,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; } + if (controller instanceof TimeLimitedRpcController) { + TimeLimitedRpcController timeLimitedRpcController = + (TimeLimitedRpcController)controller; + if (timeLimitedRpcController.getCallTimeout() > 0) { + timeLimitDelta = Math.min(timeLimitDelta, + timeLimitedRpcController.getCallTimeout()); + } + } // Use half of whichever timeout value was more restrictive... But don't allow // the time limit to be less than the allowable minimum (could cause an // immediatate timeout before scanning any data). http://git-wip-us.apache.org/repos/asf/hbase/blob/e47bfb90/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index b8eacc7..a958ee0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -102,16 +103,16 @@ public class TestScannerHeartbeatMessages { private static int VALUE_SIZE = 128; private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static int SERVER_TIMEOUT = 6000; + // Time, in milliseconds, that the client will wait for a response from the server before timing // out. This value is used server side to determine when it is necessary to send a heartbeat // message to the client - private static int CLIENT_TIMEOUT = 2000; - - // The server limits itself to running for half of the CLIENT_TIMEOUT value. - private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2; + private static int CLIENT_TIMEOUT = SERVER_TIMEOUT / 3; // By default, at most one row's worth of cells will be retrieved before the time limit is reached - private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2; + private static int DEFAULT_ROW_SLEEP_TIME = CLIENT_TIMEOUT / 5; // By default, at most cells for two column families are retrieved before the time limit is // reached private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES; @@ -124,8 +125,8 @@ public class TestScannerHeartbeatMessages { conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); - conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // Check the timeout condition after every cell @@ -140,7 +141,7 @@ public class TestScannerHeartbeatMessages { Table ht = TEST_UTIL.createTable(name, families); List puts = createPuts(rows, families, qualifiers, cellValue); ht.put(puts); - + ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); return ht; } @@ -282,7 +283,7 @@ public class TestScannerHeartbeatMessages { @Override public ReturnCode filterKeyValue(Cell v) throws IOException { try { - Thread.sleep(SERVER_TIME_LIMIT + 10); + Thread.sleep(CLIENT_TIMEOUT/2 + 10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }