Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F08DE200BD0 for ; Tue, 15 Nov 2016 19:49:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EF242160B1F; Tue, 15 Nov 2016 18:49:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A8A74160B05 for ; Tue, 15 Nov 2016 19:49:12 +0100 (CET) Received: (qmail 87863 invoked by uid 500); 15 Nov 2016 18:49:11 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 87704 invoked by uid 99); 15 Nov 2016 18:49:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 18:49:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85309E0C0A; Tue, 15 Nov 2016 18:49:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Tue, 15 Nov 2016 18:49:14 -0000 Message-Id: <5588a16f3aec4d86b3ba642b11fbca36@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/52] [partial] hbase-site git commit: Published site at 4d1bff9e78884adf689dd587d65afe36a336c56b. archived-at: Tue, 15 Nov 2016 18:49:15 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/86fde03b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.BlockingServiceAndInterface.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.BlockingServiceAndInterface.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.BlockingServiceAndInterface.html index d5ae09a..f3dcd17 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.BlockingServiceAndInterface.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.BlockingServiceAndInterface.html @@ -28,3041 +28,3232 @@ 020 021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 022 -023import com.google.common.util.concurrent.ThreadFactoryBuilder; -024 -025import java.io.ByteArrayInputStream; -026import java.io.ByteArrayOutputStream; -027import java.io.DataOutputStream; -028import java.io.IOException; -029import java.io.InputStream; -030import java.net.BindException; -031import java.net.InetAddress; -032import java.net.InetSocketAddress; -033import java.net.ServerSocket; -034import java.net.Socket; -035import java.net.SocketException; -036import java.net.UnknownHostException; -037import java.nio.ByteBuffer; -038import java.nio.channels.CancelledKeyException; -039import java.nio.channels.Channels; -040import java.nio.channels.ClosedChannelException; -041import java.nio.channels.GatheringByteChannel; -042import java.nio.channels.ReadableByteChannel; -043import java.nio.channels.SelectionKey; -044import java.nio.channels.Selector; -045import java.nio.channels.ServerSocketChannel; -046import java.nio.channels.SocketChannel; -047import java.nio.channels.WritableByteChannel; -048import java.security.GeneralSecurityException; -049import java.security.PrivilegedExceptionAction; -050import java.util.ArrayList; -051import java.util.Arrays; -052import java.util.Collections; -053import java.util.HashMap; -054import java.util.Iterator; -055import java.util.List; -056import java.util.Map; -057import java.util.Properties; -058import java.util.Set; -059import java.util.Timer; -060import java.util.TimerTask; -061import java.util.concurrent.ConcurrentHashMap; -062import java.util.concurrent.ConcurrentLinkedDeque; -063import java.util.concurrent.ExecutorService; -064import java.util.concurrent.Executors; -065import java.util.concurrent.LinkedBlockingQueue; -066import java.util.concurrent.atomic.AtomicInteger; -067import java.util.concurrent.atomic.LongAdder; -068import java.util.concurrent.locks.Lock; -069import java.util.concurrent.locks.ReentrantLock; -070 -071import javax.security.sasl.Sasl; -072import javax.security.sasl.SaslException; -073import javax.security.sasl.SaslServer; -074 -075import org.apache.commons.crypto.cipher.CryptoCipherFactory; -076import org.apache.commons.crypto.random.CryptoRandom; -077import org.apache.commons.crypto.random.CryptoRandomFactory; -078import org.apache.commons.logging.Log; -079import org.apache.commons.logging.LogFactory; -080import org.apache.hadoop.conf.Configuration; -081import org.apache.hadoop.hbase.CallQueueTooBigException; -082import org.apache.hadoop.hbase.CellScanner; -083import org.apache.hadoop.hbase.DoNotRetryIOException; -084import org.apache.hadoop.hbase.HBaseIOException; -085import org.apache.hadoop.hbase.HBaseInterfaceAudience; -086import org.apache.hadoop.hbase.HConstants; -087import org.apache.hadoop.hbase.Server; -088import org.apache.hadoop.hbase.classification.InterfaceAudience; -089import org.apache.hadoop.hbase.classification.InterfaceStability; -090import org.apache.hadoop.hbase.client.VersionInfoUtil; -091import org.apache.hadoop.hbase.codec.Codec; -092import org.apache.hadoop.hbase.conf.ConfigurationObserver; -093import org.apache.hadoop.hbase.exceptions.RegionMovedException; -094import org.apache.hadoop.hbase.exceptions.RequestTooBigException; -095import org.apache.hadoop.hbase.io.ByteBufferInputStream; +023import com.google.common.annotations.VisibleForTesting; +024import com.google.common.util.concurrent.ThreadFactoryBuilder; +025 +026import java.io.ByteArrayInputStream; +027import java.io.ByteArrayOutputStream; +028import java.io.DataOutputStream; +029import java.io.IOException; +030import java.io.InputStream; +031import java.net.BindException; +032import java.net.InetAddress; +033import java.net.InetSocketAddress; +034import java.net.ServerSocket; +035import java.net.Socket; +036import java.net.SocketException; +037import java.net.UnknownHostException; +038import java.nio.ByteBuffer; +039import java.nio.channels.CancelledKeyException; +040import java.nio.channels.Channels; +041import java.nio.channels.ClosedChannelException; +042import java.nio.channels.GatheringByteChannel; +043import java.nio.channels.ReadableByteChannel; +044import java.nio.channels.SelectionKey; +045import java.nio.channels.Selector; +046import java.nio.channels.ServerSocketChannel; +047import java.nio.channels.SocketChannel; +048import java.nio.channels.WritableByteChannel; +049import java.security.GeneralSecurityException; +050import java.security.PrivilegedExceptionAction; +051import java.util.ArrayList; +052import java.util.Arrays; +053import java.util.Collections; +054import java.util.HashMap; +055import java.util.Iterator; +056import java.util.List; +057import java.util.Map; +058import java.util.Properties; +059import java.util.Set; +060import java.util.Timer; +061import java.util.TimerTask; +062import java.util.concurrent.ConcurrentHashMap; +063import java.util.concurrent.ConcurrentLinkedDeque; +064import java.util.concurrent.ExecutorService; +065import java.util.concurrent.Executors; +066import java.util.concurrent.LinkedBlockingQueue; +067import java.util.concurrent.atomic.AtomicInteger; +068import java.util.concurrent.atomic.LongAdder; +069import java.util.concurrent.locks.Lock; +070import java.util.concurrent.locks.ReentrantLock; +071 +072import javax.security.sasl.Sasl; +073import javax.security.sasl.SaslException; +074import javax.security.sasl.SaslServer; +075 +076import org.apache.commons.crypto.cipher.CryptoCipherFactory; +077import org.apache.commons.crypto.random.CryptoRandom; +078import org.apache.commons.crypto.random.CryptoRandomFactory; +079import org.apache.commons.logging.Log; +080import org.apache.commons.logging.LogFactory; +081import org.apache.hadoop.conf.Configuration; +082import org.apache.hadoop.hbase.CallQueueTooBigException; +083import org.apache.hadoop.hbase.CellScanner; +084import org.apache.hadoop.hbase.DoNotRetryIOException; +085import org.apache.hadoop.hbase.HBaseIOException; +086import org.apache.hadoop.hbase.HBaseInterfaceAudience; +087import org.apache.hadoop.hbase.HConstants; +088import org.apache.hadoop.hbase.Server; +089import org.apache.hadoop.hbase.classification.InterfaceAudience; +090import org.apache.hadoop.hbase.classification.InterfaceStability; +091import org.apache.hadoop.hbase.client.VersionInfoUtil; +092import org.apache.hadoop.hbase.codec.Codec; +093import org.apache.hadoop.hbase.conf.ConfigurationObserver; +094import org.apache.hadoop.hbase.exceptions.RegionMovedException; +095import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 096import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 097import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 098import org.apache.hadoop.hbase.io.ByteBufferPool; 099import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 100import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 101import org.apache.hadoop.hbase.monitoring.TaskMonitor; -102import org.apache.hadoop.hbase.regionserver.RSRpcServices; -103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -104import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -105import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; -106import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; -107import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; -108import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; -109import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; -110import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -111import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; -112import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; -113import org.apache.hadoop.hbase.security.AccessDeniedException; -114import org.apache.hadoop.hbase.security.AuthMethod; -115import org.apache.hadoop.hbase.security.HBasePolicyProvider; -116import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -117import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; -118import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; -119import org.apache.hadoop.hbase.security.SaslStatus; -120import org.apache.hadoop.hbase.security.SaslUtil; -121import org.apache.hadoop.hbase.security.User; -122import org.apache.hadoop.hbase.security.UserProvider; -123import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; -124import org.apache.hadoop.hbase.util.ByteBufferUtils; -125import org.apache.hadoop.hbase.util.Bytes; -126import org.apache.hadoop.hbase.util.Pair; -127import org.apache.hadoop.hbase.util.Threads; -128import org.apache.hadoop.io.BytesWritable; -129import org.apache.hadoop.io.IOUtils; -130import org.apache.hadoop.io.IntWritable; -131import org.apache.hadoop.io.Writable; -132import org.apache.hadoop.io.WritableUtils; -133import org.apache.hadoop.io.compress.CompressionCodec; -134import org.apache.hadoop.security.UserGroupInformation; -135import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -136import org.apache.hadoop.security.authorize.AuthorizationException; -137import org.apache.hadoop.security.authorize.PolicyProvider; -138import org.apache.hadoop.security.authorize.ProxyUsers; -139import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; -140import org.apache.hadoop.security.token.SecretManager; -141import org.apache.hadoop.security.token.SecretManager.InvalidToken; -142import org.apache.hadoop.security.token.TokenIdentifier; -143import org.apache.hadoop.util.StringUtils; -144import org.apache.htrace.TraceInfo; -145import org.codehaus.jackson.map.ObjectMapper; -146 -147import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; -148import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; -149import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; -150import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; -151import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; -152import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -153import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -154import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; -155 -156/** -157 * An RPC server that hosts protobuf described Services. -158 * -159 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number -160 * of Readers in an ExecutorPool, 10 by default. The Listener does an accept and then -161 * round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does -162 * total read off the channel and the parse from which it makes a Call. The call is wrapped in a -163 * CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done -164 * and loops till done. -165 * -166 * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it -167 * has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run -168 * taking from the queue. They run the CallRunner#run method on each item gotten from queue -169 * and keep taking while the server is up. -170 * -171 * CallRunner#run executes the call. When done, asks the included Call to put itself on new -172 * queue for Responder to pull from and return result to client. -173 * -174 * @see BlockingRpcClient -175 */ -176@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) -177@InterfaceStability.Evolving -178public class RpcServer implements RpcServerInterface, ConfigurationObserver { -179 // LOG is being used in CallRunner and the log level is being changed in tests -180 public static final Log LOG = LogFactory.getLog(RpcServer.class); -181 private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION -182 = new CallQueueTooBigException(); -183 -184 private final boolean authorize; -185 private boolean isSecurityEnabled; -186 -187 public static final byte CURRENT_VERSION = 0; -188 -189 /** -190 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. -191 */ -192 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = -193 "hbase.ipc.server.fallback-to-simple-auth-allowed"; -194 -195 /** -196 * How many calls/handler are allowed in the queue. -197 */ -198 static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; -199 -200 private final CellBlockBuilder cellBlockBuilder; -201 -202 private static final String AUTH_FAILED_FOR = "Auth failed for "; -203 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; -204 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." + -205 Server.class.getName()); -206 protected SecretManager<TokenIdentifier> secretManager; -207 protected ServiceAuthorizationManager authManager; -208 -209 /** This is set to Call object before Handler invokes an RPC and ybdie -210 * after the call returns. -211 */ -212 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>(); -213 -214 /** Keeps MonitoredRPCHandler per handler thread. */ -215 static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC -216 = new ThreadLocal<MonitoredRPCHandler>(); +102import org.apache.hadoop.hbase.nio.ByteBuff; +103import org.apache.hadoop.hbase.nio.MultiByteBuff; +104import org.apache.hadoop.hbase.nio.SingleByteBuff; +105import org.apache.hadoop.hbase.regionserver.RSRpcServices; +106import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +107import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +108import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +109import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; +110import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; +111import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; +112import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; +113import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +114import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; +115import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; +116import org.apache.hadoop.hbase.security.AccessDeniedException; +117import org.apache.hadoop.hbase.security.AuthMethod; +118import org.apache.hadoop.hbase.security.HBasePolicyProvider; +119import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; +120import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; +121import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; +122import org.apache.hadoop.hbase.security.SaslStatus; +123import org.apache.hadoop.hbase.security.SaslUtil; +124import org.apache.hadoop.hbase.security.User; +125import org.apache.hadoop.hbase.security.UserProvider; +126import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +127import org.apache.hadoop.hbase.util.ByteBufferUtils; +128import org.apache.hadoop.hbase.util.Bytes; +129import org.apache.hadoop.hbase.util.Pair; +130import org.apache.hadoop.hbase.util.Threads; +131import org.apache.hadoop.io.BytesWritable; +132import org.apache.hadoop.io.IOUtils; +133import org.apache.hadoop.io.IntWritable; +134import org.apache.hadoop.io.Writable; +135import org.apache.hadoop.io.WritableUtils; +136import org.apache.hadoop.io.compress.CompressionCodec; +137import org.apache.hadoop.security.UserGroupInformation; +138import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +139import org.apache.hadoop.security.authorize.AuthorizationException; +140import org.apache.hadoop.security.authorize.PolicyProvider; +141import org.apache.hadoop.security.authorize.ProxyUsers; +142import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +143import org.apache.hadoop.security.token.SecretManager; +144import org.apache.hadoop.security.token.SecretManager.InvalidToken; +145import org.apache.hadoop.security.token.TokenIdentifier; +146import org.apache.hadoop.util.StringUtils; +147import org.apache.htrace.TraceInfo; +148import org.codehaus.jackson.map.ObjectMapper; +149 +150import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +151import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +152import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; +153import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; +154import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +155import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +156import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +157import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +158import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +159 +160/** +161 * An RPC server that hosts protobuf described Services. +162 * +163 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number +164 * of Readers in an ExecutorPool, 10 by default. The Listener does an accept and then +165 * round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does +166 * total read off the channel and the parse from which it makes a Call. The call is wrapped in a +167 * CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done +168 * and loops till done. +169 * +170 * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it +171 * has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run +172 * taking from the queue. They run the CallRunner#run method on each item gotten from queue +173 * and keep taking while the server is up. +174 * +175 * CallRunner#run executes the call. When done, asks the included Call to put itself on new +176 * queue for Responder to pull from and return result to client. +177 * +178 * @see BlockingRpcClient +179 */ +180@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +181@InterfaceStability.Evolving +182public class RpcServer implements RpcServerInterface, ConfigurationObserver { +183 // LOG is being used in CallRunner and the log level is being changed in tests +184 public static final Log LOG = LogFactory.getLog(RpcServer.class); +185 private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION +186 = new CallQueueTooBigException(); +187 +188 private final boolean authorize; +189 private boolean isSecurityEnabled; +190 +191 public static final byte CURRENT_VERSION = 0; +192 +193 /** +194 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. +195 */ +196 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = +197 "hbase.ipc.server.fallback-to-simple-auth-allowed"; +198 +199 /** +200 * How many calls/handler are allowed in the queue. +201 */ +202 static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; +203 +204 private final CellBlockBuilder cellBlockBuilder; +205 +206 private static final String AUTH_FAILED_FOR = "Auth failed for "; +207 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; +208 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." + +209 Server.class.getName()); +210 protected SecretManager<TokenIdentifier> secretManager; +211 protected ServiceAuthorizationManager authManager; +212 +213 /** This is set to Call object before Handler invokes an RPC and ybdie +214 * after the call returns. +215 */ +216 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>(); 217 -218 protected final InetSocketAddress bindAddress; -219 protected int port; // port we listen on -220 protected InetSocketAddress address; // inet address we listen on -221 private int readThreads; // number of read threads -222 protected MetricsHBaseServer metrics; -223 -224 protected final Configuration conf; -225 -226 /** -227 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over -228 * this size, then we will reject the call (after parsing it though). It will go back to the -229 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The -230 * call queue size gets incremented after we parse a call and before we add it to the queue of -231 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current -232 * size is kept in {@link #callQueueSizeInBytes}. -233 * @see {@link #callQueueSizeInBytes} -234 * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE} -235 * @see {@link #callQueueSizeInBytes} -236 */ -237 private final long maxQueueSizeInBytes; -238 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; -239 -240 /** -241 * This is a running count of the size in bytes of all outstanding calls whether currently -242 * executing or queued waiting to be run. -243 */ -244 protected final LongAdder callQueueSizeInBytes = new LongAdder(); -245 -246 protected int socketSendBufferSize; -247 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm -248 protected final boolean tcpKeepAlive; // if T then use keepalives -249 protected final long purgeTimeout; // in milliseconds -250 -251 /** -252 * This flag is used to indicate to sub threads when they should go down. When we call -253 * {@link #start()}, all threads started will consult this flag on whether they should -254 * keep going. It is set to false when {@link #stop()} is called. -255 */ -256 volatile boolean running = true; -257 -258 /** -259 * This flag is set to true after all threads are up and 'running' and the server is then opened -260 * for business by the call to {@link #start()}. -261 */ -262 volatile boolean started = false; -263 -264 // maintains the set of client connections and handles idle timeouts -265 private ConnectionManager connectionManager; -266 private Listener listener = null; -267 protected Responder responder = null; -268 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; -269 -270 protected HBaseRPCErrorHandler errorHandler = null; -271 -272 static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; -273 private static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = -274 new RequestTooBigException(); +218 /** Keeps MonitoredRPCHandler per handler thread. */ +219 static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC +220 = new ThreadLocal<MonitoredRPCHandler>(); +221 +222 protected final InetSocketAddress bindAddress; +223 protected int port; // port we listen on +224 protected InetSocketAddress address; // inet address we listen on +225 private int readThreads; // number of read threads +226 protected MetricsHBaseServer metrics; +227 +228 protected final Configuration conf; +229 +230 /** +231 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over +232 * this size, then we will reject the call (after parsing it though). It will go back to the +233 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The +234 * call queue size gets incremented after we parse a call and before we add it to the queue of +235 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current +236 * size is kept in {@link #callQueueSizeInBytes}. +237 * @see {@link #callQueueSizeInBytes} +238 * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE} +239 * @see {@link #callQueueSizeInBytes} +240 */ +241 private final long maxQueueSizeInBytes; +242 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; +243 +244 /** +245 * This is a running count of the size in bytes of all outstanding calls whether currently +246 * executing or queued waiting to be run. +247 */ +248 protected final LongAdder callQueueSizeInBytes = new LongAdder(); +249 +250 protected int socketSendBufferSize; +251 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm +252 protected final boolean tcpKeepAlive; // if T then use keepalives +253 protected final long purgeTimeout; // in milliseconds +254 +255 /** +256 * This flag is used to indicate to sub threads when they should go down. When we call +257 * {@link #start()}, all threads started will consult this flag on whether they should +258 * keep going. It is set to false when {@link #stop()} is called. +259 */ +260 volatile boolean running = true; +261 +262 /** +263 * This flag is set to true after all threads are up and 'running' and the server is then opened +264 * for business by the call to {@link #start()}. +265 */ +266 volatile boolean started = false; +267 +268 // maintains the set of client connections and handles idle timeouts +269 private ConnectionManager connectionManager; +270 private Listener listener = null; +271 protected Responder responder = null; +272 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; +273 +274 protected HBaseRPCErrorHandler errorHandler = null; 275 -276 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; -277 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; -278 -279 /** -280 * Minimum allowable timeout (in milliseconds) in rpc request's header. This -281 * configuration exists to prevent the rpc service regarding this request as timeout immediately. -282 */ -283 private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; -284 private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; -285 -286 /** Default value for above params */ -287 private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M -288 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds -289 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; -290 -291 private static final ObjectMapper MAPPER = new ObjectMapper(); -292 -293 private final int maxRequestSize; -294 private final int warnResponseTime; -295 private final int warnResponseSize; +276 static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; +277 private static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = +278 new RequestTooBigException(); +279 +280 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; +281 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; +282 +283 /** +284 * Minimum allowable timeout (in milliseconds) in rpc request's header. This +285 * configuration exists to prevent the rpc service regarding this request as timeout immediately. +286 */ +287 private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; +288 private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; +289 +290 /** Default value for above params */ +291 private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M +292 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds +293 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; +294 +295 private static final ObjectMapper MAPPER = new ObjectMapper(); 296 -297 private final int minClientRequestTimeout; -298 -299 private final Server server; -300 private final List<BlockingServiceAndInterface> services; -301 -302 private final RpcScheduler scheduler; -303 -304 private UserProvider userProvider; +297 private final int maxRequestSize; +298 private final int warnResponseTime; +299 private final int warnResponseSize; +300 +301 private final int minClientRequestTimeout; +302 +303 private final Server server; +304 private final List<BlockingServiceAndInterface> services; 305 -306 private final ByteBufferPool reservoir; +306 private final RpcScheduler scheduler; 307 -308 private volatile boolean allowFallbackToSimpleAuth; +308 private UserProvider userProvider; 309 -310 /** -311 * Used to get details for scan with a scanner_id<br/> -312 * TODO try to figure out a better way and remove reference from regionserver package later. -313 */ -314 private RSRpcServices rsRpcServices; +310 private final ByteBufferPool reservoir; +311 // The requests and response will use buffers from ByteBufferPool, when the size of the +312 // request/response is at least this size. +313 // We make this to be 1/6th of the pool buffer size. +314 private final int minSizeForReservoirUse; 315 -316 /** -317 * Datastructure that holds all necessary to a method invocation and then afterward, carries -318 * the result. -319 */ -320 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) -321 @InterfaceStability.Evolving -322 public class Call implements RpcCallContext { -323 protected int id; // the client's call id -324 protected BlockingService service; -325 protected MethodDescriptor md; -326 protected RequestHeader header; -327 protected Message param; // the parameter passed -328 // Optional cell data passed outside of protobufs. -329 protected CellScanner cellScanner; -330 protected Connection connection; // connection to client -331 protected long timestamp; // the time received when response is null -332 // the time served when response is not null -333 protected int timeout; -334 protected long startTime; -335 protected long deadline;// the deadline to handle this call, if exceed we can drop it. -336 -337 /** -338 * Chain of buffers to send as response. -339 */ -340 protected BufferChain response; -341 protected Responder responder; -342 -343 protected long size; // size of current call -344 protected boolean isError; -345 protected TraceInfo tinfo; -346 private ByteBufferListOutputStream cellBlockStream = null; -347 -348 private User user; -349 private InetAddress remoteAddress; -350 private RpcCallback callback; -351 -352 private long responseCellSize = 0; -353 private long responseBlockSize = 0; -354 private boolean retryImmediatelySupported; -355 -356 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", -357 justification="Can't figure why this complaint is happening... see below") -358 Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, -359 Message param, CellScanner cellScanner, Connection connection, Responder responder, -360 long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) { -361 this.id = id; -362 this.service = service; -363 this.md = md; -364 this.header = header; -365 this.param = param; -366 this.cellScanner = cellScanner; -367 this.connection = connection; -368 this.timestamp = System.currentTimeMillis(); -369 this.response = null; -370 this.responder = responder; -371 this.isError = false; -372 this.size = size; -373 this.tinfo = tinfo; -374 this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH -375 this.remoteAddress = remoteAddress; -376 this.retryImmediatelySupported = -377 connection == null? null: connection.retryImmediatelySupported; -378 this.timeout = timeout; -379 this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE; -380 } -381 -382 /** -383 * Call is done. Execution happened and we returned results to client. It is now safe to -384 * cleanup. -385 */ -386 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", -387 justification="Presume the lock on processing request held by caller is protection enough") -388 void done() { -389 if (this.cellBlockStream != null) { -390 this.cellBlockStream.releaseResources();// This will return back the BBs which we -391 // got from pool. -392 this.cellBlockStream = null; -393 } -394 this.connection.decRpcCount(); // Say that we're done with this call. -395 } -396 -397 @Override -398 public String toString() { -399 return toShortString() + " param: " + -400 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + -401 " connection: " + connection.toString(); -402 } -403 -404 protected RequestHeader getHeader() { -405 return this.header; -406 } -407 -408 public boolean hasPriority() { -409 return this.header.hasPriority(); -410 } -411 -412 public int getPriority() { -413 return this.header.getPriority(); -414 } -415 -416 /* -417 * Short string representation without param info because param itself could be huge depends on -418 * the payload of a command -419 */ -420 String toShortString() { -421 String serviceName = this.connection.service != null ? -422 this.connection.service.getDescriptorForType().getName() : "null"; -423 return "callId: " + this.id + " service: " + serviceName + -424 " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") + -425 " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + -426 " connection: " + connection.toString() + -427 " deadline: " + deadline; -428 } -429 -430 String toTraceString() { -431 String serviceName = this.connection.service != null ? -432 this.connection.service.getDescriptorForType().getName() : ""; -433 String methodName = (this.md != null) ? this.md.getName() : ""; -434 return serviceName + "." + methodName; -435 } -436 -437 protected synchronized void setSaslTokenResponse(ByteBuffer response) { -438 ByteBuffer[] responseBufs = new ByteBuffer[1]; -439 responseBufs[0] = response; -440 this.response = new BufferChain(responseBufs); -441 } -442 -443 protected synchronized void setConnectionHeaderResponse(ByteBuffer response) { -444 ByteBuffer[] responseBufs = new ByteBuffer[1]; -445 responseBufs[0] = response; -446 this.response = new BufferChain(responseBufs); -447 } -448 -449 protected synchronized void setResponse(Object m, final CellScanner cells, -450 Throwable t, String errorMsg) { -451 if (this.isError) return; -452 if (t != null) this.isError = true; -453 BufferChain bc = null; -454 try { -455 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); -456 // Presume it a pb Message. Could be null. -457 Message result = (Message)m; -458 // Call id. -459 headerBuilder.setCallId(this.id); -460 if (t != null) { -461 setExceptionResponse(t, errorMsg, headerBuilder); -462 } -463 // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the -464 // reservoir when finished. This is hacky and the hack is not contained but benefits are -465 // high when we can avoid a big buffer allocation on each rpc. -466 List<ByteBuffer> cellBlock = null; -467 int cellBlockSize = 0; -468 if (reservoir != null) { -469 this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec, -470 this.connection.compressionCodec, cells, reservoir); -471 if (this.cellBlockStream != null) { -472 cellBlock = this.cellBlockStream.getByteBuffers(); -473 cellBlockSize = this.cellBlockStream.size(); -474 } -475 } else { -476 ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec, -477 this.connection.compressionCodec, cells); -478 if (b != null) { -479 cellBlockSize = b.remaining(); -480 cellBlock = new ArrayList<ByteBuffer>(1); -481 cellBlock.add(b); -482 } -483 } -484 -485 if (cellBlockSize > 0) { -486 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); -487 // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. -488 cellBlockBuilder.setLength(cellBlockSize); -489 headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); -490 } -491 Message header = headerBuilder.build(); -492 ByteBuffer headerBuf = -493 createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock); -494 ByteBuffer[] responseBufs = null; -495 int cellBlockBufferSize = 0; -496 if (cellBlock != null) { -497 cellBlockBufferSize = cellBlock.size(); -498 responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; -499 } else { -500 responseBufs = new ByteBuffer[1]; -501 } -502 responseBufs[0] = headerBuf; -503 if (cellBlock != null) { -504 for (int i = 0; i < cellBlockBufferSize; i++) { -505 responseBufs[i + 1] = cellBlock.get(i); -506 } -507 } -508 bc = new BufferChain(responseBufs); -509 if (connection.useWrap) { -510 bc = wrapWithSasl(bc); -511 } -512 } catch (IOException e) { -513 LOG.warn("Exception while creating response " + e); -514 } -515 this.response = bc; -516 // Once a response message is created and set to this.response, this Call can be treated as -517 // done. The Responder thread will do the n/w write of this message back to client. -518 if (this.callback != null) { -519 try { -520 this.callback.run(); -521 } catch (Exception e) { -522 // Don't allow any exception here to kill this handler thread. -523 LOG.warn("Exception while running the Rpc Callback.", e); -524 } -525 } -526 } -527 -528 private void setExceptionResponse(Throwable t, String errorMsg, -529 ResponseHeader.Builder headerBuilder) { -530 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); -531 exceptionBuilder.setExceptionClassName(t.getClass().getName()); -532 exceptionBuilder.setStackTrace(errorMsg); -533 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); -534 if (t instanceof RegionMovedException) { -535 // Special casing for this exception. This is only one carrying a payload. -536 // Do this instead of build a generic system for allowing exceptions carry -537 // any kind of payload. -538 RegionMovedException rme = (RegionMovedException)t; -539 exceptionBuilder.setHostname(rme.getHostname()); -540 exceptionBuilder.setPort(rme.getPort()); -541 } -542 // Set the exception as the result of the method invocation. -543 headerBuilder.setException(exceptionBuilder.build()); -544 } -545 -546 private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, -547 int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException { -548 // Organize the response as a set of bytebuffers rather than collect it all together inside -549 // one big byte array; save on allocations. -550 // for writing the header, we check if there is available space in the buffers -551 // created for the cellblock itself. If there is space for the header, we reuse -552 // the last buffer in the cellblock. This applies to the cellblock created from the -553 // pool or even the onheap cellblock buffer in case there is no pool enabled. -554 // Possible reuse would avoid creating a temporary array for storing the header every time. -555 ByteBuffer possiblePBBuf = -556 (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; -557 int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, -558 resultVintSize = 0; -559 if (header != null) { -560 headerSerializedSize = header.getSerializedSize(); -561 headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize); -562 } -563 if (result != null) { -564 resultSerializedSize = result.getSerializedSize(); -565 resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize); -566 } -567 // calculate the total size -568 int totalSize = headerSerializedSize + headerVintSize -569 + (resultSerializedSize + resultVintSize) -570 + cellBlockSize; -571 int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize -