Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E9C93200C7F for ; Tue, 9 May 2017 17:00:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E87CD160BC8; Tue, 9 May 2017 15:00:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CED0D160BCE for ; Tue, 9 May 2017 17:00:27 +0200 (CEST) Received: (qmail 76226 invoked by uid 500); 9 May 2017 15:00:23 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 73871 invoked by uid 99); 9 May 2017 15:00:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 15:00:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6F607DFAF3; Tue, 9 May 2017 15:00:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 09 May 2017 15:00:36 -0000 Message-Id: <6d67088937584a8ba6f813eeaa6d671a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Tue, 09 May 2017 15:00:30 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/7ef4c5a9/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Connection.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Connection.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Connection.html index 5cc356a..9e1c66c 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Connection.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Connection.html @@ -70,2037 +70,1559 @@ 062import org.apache.hadoop.hbase.client.VersionInfoUtil; 063import org.apache.hadoop.hbase.codec.Codec; 064import org.apache.hadoop.hbase.conf.ConfigurationObserver; -065import org.apache.hadoop.hbase.exceptions.RegionMovedException; -066import org.apache.hadoop.hbase.exceptions.RequestTooBigException; -067import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; -068import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -069import org.apache.hadoop.hbase.io.ByteBufferPool; -070import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; -071import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -072import org.apache.hadoop.hbase.monitoring.TaskMonitor; -073import org.apache.hadoop.hbase.nio.ByteBuff; -074import org.apache.hadoop.hbase.nio.MultiByteBuff; -075import org.apache.hadoop.hbase.nio.SingleByteBuff; -076import org.apache.hadoop.hbase.regionserver.RSRpcServices; -077import org.apache.hadoop.hbase.security.AccessDeniedException; -078import org.apache.hadoop.hbase.security.AuthMethod; -079import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -080import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; -081import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; -082import org.apache.hadoop.hbase.security.SaslStatus; -083import org.apache.hadoop.hbase.security.SaslUtil; -084import org.apache.hadoop.hbase.security.User; -085import org.apache.hadoop.hbase.security.UserProvider; -086import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; -087import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; -088import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; -089import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; -090import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; -091import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; -092import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; -093import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -094import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -095import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; -096import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; -097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -099import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; -100import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; -101import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; -102import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; -103import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; -104import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -105import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; -106import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; -107import org.apache.hadoop.hbase.util.ByteBufferUtils; -108import org.apache.hadoop.hbase.util.Bytes; -109import org.apache.hadoop.hbase.util.Pair; -110import org.apache.hadoop.io.BytesWritable; -111import org.apache.hadoop.io.Writable; -112import org.apache.hadoop.io.WritableUtils; -113import org.apache.hadoop.io.compress.CompressionCodec; -114import org.apache.hadoop.security.UserGroupInformation; -115import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -116import org.apache.hadoop.security.authorize.AuthorizationException; -117import org.apache.hadoop.security.authorize.PolicyProvider; -118import org.apache.hadoop.security.authorize.ProxyUsers; -119import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; -120import org.apache.hadoop.security.token.SecretManager; -121import org.apache.hadoop.security.token.SecretManager.InvalidToken; -122import org.apache.hadoop.security.token.TokenIdentifier; -123import org.apache.hadoop.util.StringUtils; -124import org.apache.htrace.TraceInfo; -125import org.codehaus.jackson.map.ObjectMapper; -126 -127import com.google.common.annotations.VisibleForTesting; -128 -129/** -130 * An RPC server that hosts protobuf described Services. -131 * -132 */ -133@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) -134@InterfaceStability.Evolving -135public abstract class RpcServer implements RpcServerInterface, -136 ConfigurationObserver { -137 // LOG is being used in CallRunner and the log level is being changed in tests -138 public static final Log LOG = LogFactory.getLog(RpcServer.class); -139 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION -140 = new CallQueueTooBigException(); -141 -142 private final boolean authorize; -143 protected boolean isSecurityEnabled; +065import org.apache.hadoop.hbase.exceptions.RequestTooBigException; +066import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +067import org.apache.hadoop.hbase.io.ByteBufferPool; +068import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; +069import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +070import org.apache.hadoop.hbase.monitoring.TaskMonitor; +071import org.apache.hadoop.hbase.nio.ByteBuff; +072import org.apache.hadoop.hbase.nio.MultiByteBuff; +073import org.apache.hadoop.hbase.nio.SingleByteBuff; +074import org.apache.hadoop.hbase.regionserver.RSRpcServices; +075import org.apache.hadoop.hbase.security.AccessDeniedException; +076import org.apache.hadoop.hbase.security.AuthMethod; +077import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; +078import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; +079import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; +080import org.apache.hadoop.hbase.security.SaslStatus; +081import org.apache.hadoop.hbase.security.SaslUtil; +082import org.apache.hadoop.hbase.security.User; +083import org.apache.hadoop.hbase.security.UserProvider; +084import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +085import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +086import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; +087import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +088import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; +089import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +090import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +091import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +092import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +093import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +094import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +096import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +097import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; +098import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; +099import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +100import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; +101import org.apache.hadoop.hbase.util.Bytes; +102import org.apache.hadoop.hbase.util.Pair; +103import org.apache.hadoop.io.BytesWritable; +104import org.apache.hadoop.io.Writable; +105import org.apache.hadoop.io.WritableUtils; +106import org.apache.hadoop.io.compress.CompressionCodec; +107import org.apache.hadoop.security.UserGroupInformation; +108import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +109import org.apache.hadoop.security.authorize.AuthorizationException; +110import org.apache.hadoop.security.authorize.PolicyProvider; +111import org.apache.hadoop.security.authorize.ProxyUsers; +112import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +113import org.apache.hadoop.security.token.SecretManager; +114import org.apache.hadoop.security.token.SecretManager.InvalidToken; +115import org.apache.hadoop.security.token.TokenIdentifier; +116import org.apache.htrace.TraceInfo; +117import org.codehaus.jackson.map.ObjectMapper; +118 +119import com.google.common.annotations.VisibleForTesting; +120 +121/** +122 * An RPC server that hosts protobuf described Services. +123 * +124 */ +125@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +126@InterfaceStability.Evolving +127public abstract class RpcServer implements RpcServerInterface, +128 ConfigurationObserver { +129 // LOG is being used in CallRunner and the log level is being changed in tests +130 public static final Log LOG = LogFactory.getLog(RpcServer.class); +131 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION +132 = new CallQueueTooBigException(); +133 +134 private final boolean authorize; +135 protected boolean isSecurityEnabled; +136 +137 public static final byte CURRENT_VERSION = 0; +138 +139 /** +140 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. +141 */ +142 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = +143 "hbase.ipc.server.fallback-to-simple-auth-allowed"; 144 -145 public static final byte CURRENT_VERSION = 0; -146 -147 /** -148 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. -149 */ -150 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = -151 "hbase.ipc.server.fallback-to-simple-auth-allowed"; -152 -153 /** -154 * How many calls/handler are allowed in the queue. -155 */ -156 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; -157 -158 protected final CellBlockBuilder cellBlockBuilder; -159 -160 protected static final String AUTH_FAILED_FOR = "Auth failed for "; -161 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; -162 protected static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." -163 + Server.class.getName()); -164 protected SecretManager<TokenIdentifier> secretManager; -165 protected ServiceAuthorizationManager authManager; +145 /** +146 * How many calls/handler are allowed in the queue. +147 */ +148 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; +149 +150 protected final CellBlockBuilder cellBlockBuilder; +151 +152 protected static final String AUTH_FAILED_FOR = "Auth failed for "; +153 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; +154 protected static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +155 + Server.class.getName()); +156 protected SecretManager<TokenIdentifier> secretManager; +157 protected ServiceAuthorizationManager authManager; +158 +159 /** This is set to Call object before Handler invokes an RPC and ybdie +160 * after the call returns. +161 */ +162 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); +163 +164 /** Keeps MonitoredRPCHandler per handler thread. */ +165 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); 166 -167 /** This is set to Call object before Handler invokes an RPC and ybdie -168 * after the call returns. -169 */ -170 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); -171 -172 /** Keeps MonitoredRPCHandler per handler thread. */ -173 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); -174 -175 protected final InetSocketAddress bindAddress; -176 -177 protected MetricsHBaseServer metrics; -178 -179 protected final Configuration conf; -180 -181 /** -182 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over -183 * this size, then we will reject the call (after parsing it though). It will go back to the -184 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The -185 * call queue size gets incremented after we parse a call and before we add it to the queue of -186 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current -187 * size is kept in {@link #callQueueSizeInBytes}. -188 * @see #callQueueSizeInBytes -189 * @see #DEFAULT_MAX_CALLQUEUE_SIZE -190 */ -191 protected final long maxQueueSizeInBytes; -192 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; -193 -194 /** -195 * This is a running count of the size in bytes of all outstanding calls whether currently -196 * executing or queued waiting to be run. -197 */ -198 protected final LongAdder callQueueSizeInBytes = new LongAdder(); -199 -200 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm -201 protected final boolean tcpKeepAlive; // if T then use keepalives -202 -203 /** -204 * This flag is used to indicate to sub threads when they should go down. When we call -205 * {@link #start()}, all threads started will consult this flag on whether they should -206 * keep going. It is set to false when {@link #stop()} is called. -207 */ -208 volatile boolean running = true; +167 protected final InetSocketAddress bindAddress; +168 +169 protected MetricsHBaseServer metrics; +170 +171 protected final Configuration conf; +172 +173 /** +174 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over +175 * this size, then we will reject the call (after parsing it though). It will go back to the +176 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The +177 * call queue size gets incremented after we parse a call and before we add it to the queue of +178 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current +179 * size is kept in {@link #callQueueSizeInBytes}. +180 * @see #callQueueSizeInBytes +181 * @see #DEFAULT_MAX_CALLQUEUE_SIZE +182 */ +183 protected final long maxQueueSizeInBytes; +184 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; +185 +186 /** +187 * This is a running count of the size in bytes of all outstanding calls whether currently +188 * executing or queued waiting to be run. +189 */ +190 protected final LongAdder callQueueSizeInBytes = new LongAdder(); +191 +192 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm +193 protected final boolean tcpKeepAlive; // if T then use keepalives +194 +195 /** +196 * This flag is used to indicate to sub threads when they should go down. When we call +197 * {@link #start()}, all threads started will consult this flag on whether they should +198 * keep going. It is set to false when {@link #stop()} is called. +199 */ +200 volatile boolean running = true; +201 +202 /** +203 * This flag is set to true after all threads are up and 'running' and the server is then opened +204 * for business by the call to {@link #start()}. +205 */ +206 volatile boolean started = false; +207 +208 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; 209 -210 /** -211 * This flag is set to true after all threads are up and 'running' and the server is then opened -212 * for business by the call to {@link #start()}. -213 */ -214 volatile boolean started = false; +210 protected HBaseRPCErrorHandler errorHandler = null; +211 +212 protected static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; +213 protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = +214 new RequestTooBigException(); 215 -216 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; -217 -218 protected HBaseRPCErrorHandler errorHandler = null; -219 -220 protected static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; -221 protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = -222 new RequestTooBigException(); -223 -224 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; -225 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; -226 -227 /** -228 * Minimum allowable timeout (in milliseconds) in rpc request's header. This -229 * configuration exists to prevent the rpc service regarding this request as timeout immediately. -230 */ -231 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; -232 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; -233 -234 /** Default value for above params */ -235 protected static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M -236 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds -237 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; +216 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; +217 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; +218 +219 /** +220 * Minimum allowable timeout (in milliseconds) in rpc request's header. This +221 * configuration exists to prevent the rpc service regarding this request as timeout immediately. +222 */ +223 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; +224 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; +225 +226 /** Default value for above params */ +227 protected static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M +228 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds +229 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; +230 +231 protected static final ObjectMapper MAPPER = new ObjectMapper(); +232 +233 protected final int maxRequestSize; +234 protected final int warnResponseTime; +235 protected final int warnResponseSize; +236 +237 protected final int minClientRequestTimeout; 238 -239 protected static final ObjectMapper MAPPER = new ObjectMapper(); -240 -241 protected final int maxRequestSize; -242 protected final int warnResponseTime; -243 protected final int warnResponseSize; -244 -245 protected final int minClientRequestTimeout; -246 -247 protected final Server server; -248 protected final List<BlockingServiceAndInterface> services; -249 -250 protected final RpcScheduler scheduler; +239 protected final Server server; +240 protected final List<BlockingServiceAndInterface> services; +241 +242 protected final RpcScheduler scheduler; +243 +244 protected UserProvider userProvider; +245 +246 protected final ByteBufferPool reservoir; +247 // The requests and response will use buffers from ByteBufferPool, when the size of the +248 // request/response is at least this size. +249 // We make this to be 1/6th of the pool buffer size. +250 protected final int minSizeForReservoirUse; 251 -252 protected UserProvider userProvider; +252 protected volatile boolean allowFallbackToSimpleAuth; 253 -254 protected final ByteBufferPool reservoir; -255 // The requests and response will use buffers from ByteBufferPool, when the size of the -256 // request/response is at least this size. -257 // We make this to be 1/6th of the pool buffer size. -258 protected final int minSizeForReservoirUse; +254 /** +255 * Used to get details for scan with a scanner_id<br/> +256 * TODO try to figure out a better way and remove reference from regionserver package later. +257 */ +258 private RSRpcServices rsRpcServices; 259 -260 protected volatile boolean allowFallbackToSimpleAuth; -261 -262 /** -263 * Used to get details for scan with a scanner_id<br/> -264 * TODO try to figure out a better way and remove reference from regionserver package later. -265 */ -266 private RSRpcServices rsRpcServices; -267 -268 /** -269 * Datastructure that holds all necessary to a method invocation and then afterward, carries -270 * the result. -271 */ -272 @InterfaceStability.Evolving -273 @InterfaceAudience.Private -274 public abstract class Call implements RpcCall { -275 protected int id; // the client's call id -276 protected BlockingService service; -277 protected MethodDescriptor md; -278 protected RequestHeader header; -279 protected Message param; // the parameter passed -280 // Optional cell data passed outside of protobufs. -281 protected CellScanner cellScanner; -282 protected Connection connection; // connection to client -283 protected long timestamp; // the time received when response is null -284 // the time served when response is not null -285 protected int timeout; -286 protected long startTime; -287 protected long deadline;// the deadline to handle this call, if exceed we can drop it. -288 -289 /** -290 * Chain of buffers to send as response. -291 */ -292 protected BufferChain response; +260 @FunctionalInterface +261 protected static interface CallCleanup { +262 void run(); +263 } +264 +265 /** Reads calls from a connection and queues them for handling. */ +266 @edu.umd.cs.findbugs.annotations.SuppressWarnings( +267 value="VO_VOLATILE_INCREMENT", +268 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") +269 public abstract class Connection implements Closeable { +270 // If initial preamble with version and magic has been read or not. +271 protected boolean connectionPreambleRead = false; +272 // If the connection header has been read or not. +273 protected boolean connectionHeaderRead = false; +274 +275 protected CallCleanup callCleanup; +276 +277 // Cache the remote host & port info so that even if the socket is +278 // disconnected, we can say where it used to connect to. +279 protected String hostAddress; +280 protected int remotePort; +281 protected InetAddress addr; +282 protected ConnectionHeader connectionHeader; +283 +284 /** +285 * Codec the client asked use. +286 */ +287 protected Codec codec; +288 /** +289 * Compression codec the client asked us use. +290 */ +291 protected CompressionCodec compressionCodec; +292 protected BlockingService service; 293 -294 protected long size; // size of current call -295 protected boolean isError; -296 protected TraceInfo tinfo; -297 protected ByteBufferListOutputStream cellBlockStream = null; -298 protected CallCleanup reqCleanup = null; -299 -300 protected User user; -301 protected InetAddress remoteAddress; -302 protected RpcCallback rpcCallback; -303 -304 private long responseCellSize = 0; -305 private long responseBlockSize = 0; -306 // cumulative size of serialized exceptions -307 private long exceptionSize = 0; -308 private boolean retryImmediatelySupported; -309 -310 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", -311 justification="Can't figure why this complaint is happening... see below") -312 Call(int id, final BlockingService service, final MethodDescriptor md, -313 RequestHeader header, Message param, CellScanner cellScanner, -314 Connection connection, long size, TraceInfo tinfo, -315 final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { -316 this.id = id; -317 this.service = service; -318 this.md = md; -319 this.header = header; -320 this.param = param; -321 this.cellScanner = cellScanner; -322 this.connection = connection; -323 this.timestamp = System.currentTimeMillis(); -324 this.response = null; -325 this.isError = false; -326 this.size = size; -327 this.tinfo = tinfo; -328 this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH -329 this.remoteAddress = remoteAddress; -330 this.retryImmediatelySupported = -331 connection == null? null: connection.retryImmediatelySupported; -332 this.timeout = timeout; -333 this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE; -334 this.reqCleanup = reqCleanup; -335 } -336 -337 /** -338 * Call is done. Execution happened and we returned results to client. It is -339 * now safe to cleanup. -340 */ -341 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", -342 justification = "Presume the lock on processing request held by caller is protection enough") -343 void done() { -344 if (this.cellBlockStream != null) { -345 // This will return back the BBs which we got from pool. -346 this.cellBlockStream.releaseResources(); -347 this.cellBlockStream = null; -348 } -349 // If the call was run successfuly, we might have already returned the BB -350 // back to pool. No worries..Then inputCellBlock will be null -351 cleanup(); +294 protected AuthMethod authMethod; +295 protected boolean saslContextEstablished; +296 protected boolean skipInitialSaslHandshake; +297 private ByteBuffer unwrappedData; +298 // When is this set? FindBugs wants to know! Says NP +299 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); +300 protected boolean useSasl; +301 protected SaslServer saslServer; +302 protected CryptoAES cryptoAES; +303 protected boolean useWrap = false; +304 protected boolean useCryptoAesWrap = false; +305 // Fake 'call' for failed authorization response +306 protected static final int AUTHORIZATION_FAILED_CALLID = -1; +307 protected ServerCall authFailedCall; +308 protected ByteArrayOutputStream authFailedResponse = +309 new ByteArrayOutputStream(); +310 // Fake 'call' for SASL context setup +311 protected static final int SASL_CALLID = -33; +312 protected ServerCall saslCall; +313 // Fake 'call' for connection header response +314 protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; +315 protected ServerCall setConnectionHeaderResponseCall; +316 +317 // was authentication allowed with a fallback to simple auth +318 protected boolean authenticatedWithFallback; +319 +320 protected boolean retryImmediatelySupported = false; +321 +322 public UserGroupInformation attemptingUser = null; // user name before auth +323 protected User user = null; +324 protected UserGroupInformation ugi = null; +325 +326 public Connection() { +327 this.callCleanup = null; +328 } +329 +330 @Override +331 public String toString() { +332 return getHostAddress() + ":" + remotePort; +333 } +334 +335 public String getHostAddress() { +336 return hostAddress; +337 } +338 +339 public InetAddress getHostInetAddress() { +340 return addr; +341 } +342 +343 public int getRemotePort() { +344 return remotePort; +345 } +346 +347 public VersionInfo getVersionInfo() { +348 if (connectionHeader.hasVersionInfo()) { +349 return connectionHeader.getVersionInfo(); +350 } +351 return null; 352 } 353 -354 @Override -355 public void cleanup() { -356 if (this.reqCleanup != null) { -357 this.reqCleanup.run(); -358 this.reqCleanup = null; -359 } -360 } -361 -362 @Override -363 public String toString() { -364 return toShortString() + " param: " + -365 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + -366 " connection: " + connection.toString(); -367 } -368 -369 @Override -370 public RequestHeader getHeader() { -371 return this.header; -372 } -373 -374 @Override -375 public int getPriority() { -376 return this.header.getPriority(); +354 protected String getFatalConnectionString(final int version, final byte authByte) { +355 return "serverVersion=" + CURRENT_VERSION + +356 ", clientVersion=" + version + ", authMethod=" + authByte + +357 ", authSupported=" + (authMethod != null) + " from " + toString(); +358 } +359 +360 protected UserGroupInformation getAuthorizedUgi(String authorizedId) +361 throws IOException { +362 UserGroupInformation authorizedUgi; +363 if (authMethod == AuthMethod.DIGEST) { +364 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId, +365 secretManager); +366 authorizedUgi = tokenId.getUser(); +367 if (authorizedUgi == null) { +368 throw new AccessDeniedException( +369 "Can't retrieve username from tokenIdentifier."); +370 } +371 authorizedUgi.addTokenIdentifier(tokenId); +372 } else { +373 authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId); +374 } +375 authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod()); +376 return authorizedUgi; 377 } 378 -379 /* -380 * Short string representation without param info because param itself could be huge depends on -381 * the payload of a command +379 /** +380 * Set up cell block codecs +381 * @throws FatalConnectionException 382 */ -383 @Override -384 public String toShortString() { -385 String serviceName = this.connection.service != null ? -386 this.connection.service.getDescriptorForType().getName() : "null"; -387 return "callId: " + this.id + " service: " + serviceName + -388 " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") + -389 " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + -390 " connection: " + connection.toString() + -391 " deadline: " + deadline; -392 } -393 -394 protected synchronized void setSaslTokenResponse(ByteBuffer response) { -395 ByteBuffer[] responseBufs = new ByteBuffer[1]; -396 responseBufs[0] = response; -397 this.response = new BufferChain(responseBufs); -398 } -399 -400 protected synchronized void setConnectionHeaderResponse(ByteBuffer response) { -401 ByteBuffer[] responseBufs = new ByteBuffer[1]; -402 responseBufs[0] = response; -403 this.response = new BufferChain(responseBufs); -404 } -405 -406 @Override -407 public synchronized void setResponse(Message m, final CellScanner cells, -408 Throwable t, String errorMsg) { -409 if (this.isError) return; -410 if (t != null) this.isError = true; -411 BufferChain bc = null; -412 try { -413 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); -414 // Call id. -415 headerBuilder.setCallId(this.id); -416 if (t != null) { -417 setExceptionResponse(t, errorMsg, headerBuilder); -418 } -419 // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the -420 // reservoir when finished. This is hacky and the hack is not contained but benefits are -421 // high when we can avoid a big buffer allocation on each rpc. -422 List<ByteBuffer> cellBlock = null; -423 int cellBlockSize = 0; -424 if (reservoir != null) { -425 this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec, -426 this.connection.compressionCodec, cells, reservoir); -427 if (this.cellBlockStream != null) { -428 cellBlock = this.cellBlockStream.getByteBuffers(); -429 cellBlockSize = this.cellBlockStream.size(); -430 } -431 } else { -432 ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec, -433 this.connection.compressionCodec, cells); -434 if (b != null) { -435 cellBlockSize = b.remaining(); -436 cellBlock = new ArrayList<>(1); -437 cellBlock.add(b); -438 } -439 } -440 -441 if (cellBlockSize > 0) { -442 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); -443 // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. -444 cellBlockBuilder.setLength(cellBlockSize); -445 headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); -446 } -447 Message header = headerBuilder.build(); -448 ByteBuffer headerBuf = -449 createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); -450 ByteBuffer[] responseBufs = null; -451 int cellBlockBufferSize = 0; -452 if (cellBlock != null) { -453 cellBlockBufferSize = cellBlock.size(); -454 responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; -455 } else { -456 responseBufs = new ByteBuffer[1]; -457 } -458 responseBufs[0] = headerBuf; -459 if (cellBlock != null) { -460 for (int i = 0; i < cellBlockBufferSize; i++) { -461 responseBufs[i + 1] = cellBlock.get(i); -462 } -463 } -464 bc = new BufferChain(responseBufs); -465 if (connection.useWrap) { -466 bc = wrapWithSasl(bc); -467 } -468 } catch (IOException e) { -469 LOG.warn("Exception while creating response " + e); -470 } -471 this.response = bc; -472 // Once a response message is created and set to this.response, this Call can be treated as -473 // done. The Responder thread will do the n/w write of this message back to client. -474 if (this.rpcCallback != null) { -475 try { -476 this.rpcCallback.run(); -477 } catch (Exception e) { -478 // Don't allow any exception here to kill this handler thread. -479 LOG.warn("Exception while running the Rpc Callback.", e); -480 } -481 } -482 } -483 -484 protected void setExceptionResponse(Throwable t, String errorMsg, -485 ResponseHeader.Builder headerBuilder) { -486 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); -487 exceptionBuilder.setExceptionClassName(t.getClass().getName()); -488 exceptionBuilder.setStackTrace(errorMsg); -489 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); -490 if (t instanceof RegionMovedException) { -491 // Special casing for this exception. This is only one carrying a payload. -492 // Do this instead of build a generic system for allowing exceptions carry -493 // any kind of payload. -494 RegionMovedException rme = (RegionMovedException)t; -495 exceptionBuilder.setHostname(rme.getHostname()); -496 exceptionBuilder.setPort(rme.getPort()); -497 } -498 // Set the exception as the result of the method invocation. -499 headerBuilder.setException(exceptionBuilder.build()); -500 } -501 -502 protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header, -503 int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException { -504 // Organize the response as a set of bytebuffers rather than collect it all together inside -505 // one big byte array; save on allocations. -506 // for writing the header, we check if there is available space in the buffers -507 // created for the cellblock itself. If there is space for the header, we reuse -508 // the last buffer in the cellblock. This applies to the cellblock created from the -509 // pool or even the onheap cellblock buffer in case there is no pool enabled. -510 // Possible reuse would avoid creating a temporary array for storing the header every time. -511 ByteBuffer possiblePBBuf = -512 (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; -513 int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, -514 resultVintSize = 0; -515 if (header != null) { -516 headerSerializedSize = header.getSerializedSize(); -517 headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize); -518 } -519 if (result != null) { -520 resultSerializedSize = result.getSerializedSize(); -521 resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize); -522 } -523 // calculate the total size -524 int totalSize = headerSerializedSize + headerVintSize -525 + (resultSerializedSize + resultVintSize) -526 + cellBlockSize; -527 int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize -528 + resultVintSize + Bytes.SIZEOF_INT; -529 // Only if the last buffer has enough space for header use it. Else allocate -530 // a new buffer. Assume they are all flipped -531 if (possiblePBBuf != null -532 && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { -533 // duplicate the buffer. This is where the header is going to be written -534 ByteBuffer pbBuf = possiblePBBuf.duplicate(); -535 // get the current limit -536 int limit = pbBuf.limit(); -537 // Position such that we write the header to the end of the buffer -538 pbBuf.position(limit); -539 // limit to the header size -540 pbBuf.limit(totalPBSize + limit); -541 // mark the current position -542 pbBuf.mark(); -543 writeToCOS(result, header, totalSize, pbBuf); -544 // reset the buffer back to old position -545 pbBuf.reset(); -546 return pbBuf; -547 } else { -548 return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); -549 } -550 } -551 -552 private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) -553 throws IOException { -554 ByteBufferUtils.putInt(pbBuf, totalSize); -555 // create COS that works on BB -556 CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); -557 if (header != null) { -558 cos.writeMessageNoTag(header); -559 } -560 if (result != null) { -561 cos.writeMessageNoTag(result); -562 } -563 cos.flush(); -564 cos.checkNoSpaceLeft(); -565 } -566 -567 private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, -568 int totalSize, int totalPBSize) throws IOException { -569 ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); -570 writeToCOS(result, header, totalSize, pbBuf); -571 pbBuf.flip(); -572 return pbBuf; -573 } -574 -575 protected BufferChain wrapWithSasl(BufferChain bc) -576 throws IOException { -577 if (!this.connection.useSasl) return bc; -578 // Looks like no way around this; saslserver wants a byte array. I have to make it one. -579 // THIS IS A BIG UGLY COPY. -580 byte [] responseBytes = bc.getBytes(); -581 byte [] token; -582 // synchronization may be needed since there can be multiple Handler -583 // threads using saslServer or Crypto AES to wrap responses. -584 if (connection.useCryptoAesWrap) { -585 // wrap with Crypto AES -586 synchronized (connection.cryptoAES) { -587 token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length); -588 } -589 } else { -590 synchronized (connection.saslServer) { -591 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); -592 } -593 } -594 if (LOG.isTraceEnabled()) { -595 LOG.trace("Adding saslServer wrapped token of size " + token.length -596 + " as call response."); -597 } -598 -599 ByteBuffer[] responseBufs = new ByteBuffer[2]; -600 responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length)); -601 responseBufs[1] = ByteBuffer.wrap(token); -602 return new BufferChain(responseBufs); -603 } -604 -605 @Override -606 public boolean isClientCellBlockSupported() { -