Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE0A5184D2 for ; Wed, 24 Feb 2016 17:45:09 +0000 (UTC) Received: (qmail 81552 invoked by uid 500); 24 Feb 2016 17:45:02 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 81378 invoked by uid 500); 24 Feb 2016 17:45:02 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 80429 invoked by uid 99); 24 Feb 2016 17:45:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Feb 2016 17:45:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35453E8EBD; Wed, 24 Feb 2016 17:45:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Wed, 24 Feb 2016 17:45:20 -0000 Message-Id: <489c2dcaa30b46dda2159eccf798e51c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [21/30] hbase-site git commit: Published site at 28cd48b673ca743d193874b2951bc995699e8e89. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/89b638a4/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Listener.Reader.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Listener.Reader.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Listener.Reader.html index e5d9af6..ea9ea4d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Listener.Reader.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcServer.Listener.Reader.html @@ -2107,556 +2107,559 @@ 2099 @Override 2100 public void onConfigurationChange(Configuration newConf) { 2101 initReconfigurable(newConf); -2102 } -2103 -2104 private void initReconfigurable(Configuration confToLoad) { -2105 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); -2106 if (isSecurityEnabled && allowFallbackToSimpleAuth) { -2107 LOG.warn("********* WARNING! *********"); -2108 LOG.warn("This server is configured to allow connections from INSECURE clients"); -2109 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); -2110 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); -2111 LOG.warn("impersonation is possible!"); -2112 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); -2113 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); -2114 LOG.warn("****************************"); -2115 } -2116 } -2117 -2118 /** -2119 * Subclasses of HBaseServer can override this to provide their own -2120 * Connection implementations. -2121 */ -2122 protected Connection getConnection(SocketChannel channel, long time) { -2123 return new Connection(channel, time); -2124 } -2125 -2126 /** -2127 * Setup response for the RPC Call. -2128 * -2129 * @param response buffer to serialize the response into -2130 * @param call {@link Call} to which we are setting up the response -2131 * @param error error message, if the call failed -2132 * @throws IOException -2133 */ -2134 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error) -2135 throws IOException { -2136 if (response != null) response.reset(); -2137 call.setResponse(null, null, t, error); -2138 } -2139 -2140 protected void closeConnection(Connection connection) { -2141 synchronized (connectionList) { -2142 if (connectionList.remove(connection)) { -2143 numConnections--; -2144 } -2145 } -2146 connection.close(); -2147 } -2148 -2149 Configuration getConf() { -2150 return conf; -2151 } -2152 -2153 /** Sets the socket buffer size used for responding to RPCs. -2154 * @param size send size -2155 */ -2156 @Override -2157 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } -2158 +2102 if (scheduler instanceof ConfigurationObserver) { +2103 ((ConfigurationObserver)scheduler).onConfigurationChange(newConf); +2104 } +2105 } +2106 +2107 private void initReconfigurable(Configuration confToLoad) { +2108 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); +2109 if (isSecurityEnabled && allowFallbackToSimpleAuth) { +2110 LOG.warn("********* WARNING! *********"); +2111 LOG.warn("This server is configured to allow connections from INSECURE clients"); +2112 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); +2113 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); +2114 LOG.warn("impersonation is possible!"); +2115 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); +2116 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); +2117 LOG.warn("****************************"); +2118 } +2119 } +2120 +2121 /** +2122 * Subclasses of HBaseServer can override this to provide their own +2123 * Connection implementations. +2124 */ +2125 protected Connection getConnection(SocketChannel channel, long time) { +2126 return new Connection(channel, time); +2127 } +2128 +2129 /** +2130 * Setup response for the RPC Call. +2131 * +2132 * @param response buffer to serialize the response into +2133 * @param call {@link Call} to which we are setting up the response +2134 * @param error error message, if the call failed +2135 * @throws IOException +2136 */ +2137 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error) +2138 throws IOException { +2139 if (response != null) response.reset(); +2140 call.setResponse(null, null, t, error); +2141 } +2142 +2143 protected void closeConnection(Connection connection) { +2144 synchronized (connectionList) { +2145 if (connectionList.remove(connection)) { +2146 numConnections--; +2147 } +2148 } +2149 connection.close(); +2150 } +2151 +2152 Configuration getConf() { +2153 return conf; +2154 } +2155 +2156 /** Sets the socket buffer size used for responding to RPCs. +2157 * @param size send size +2158 */ 2159 @Override -2160 public boolean isStarted() { -2161 return this.started; -2162 } -2163 -2164 /** Starts the service. Must be called before any calls will be handled. */ -2165 @Override -2166 public synchronized void start() { -2167 if (started) return; -2168 authTokenSecretMgr = createSecretManager(); -2169 if (authTokenSecretMgr != null) { -2170 setSecretManager(authTokenSecretMgr); -2171 authTokenSecretMgr.start(); -2172 } -2173 this.authManager = new ServiceAuthorizationManager(); -2174 HBasePolicyProvider.init(conf, authManager); -2175 responder.start(); -2176 listener.start(); -2177 scheduler.start(); -2178 started = true; -2179 } -2180 -2181 @Override -2182 public synchronized void refreshAuthManager(PolicyProvider pp) { -2183 // Ignore warnings that this should be accessed in a static way instead of via an instance; -2184 // it'll break if you go via static route. -2185 this.authManager.refresh(this.conf, pp); -2186 } -2187 -2188 private AuthenticationTokenSecretManager createSecretManager() { -2189 if (!isSecurityEnabled) return null; -2190 if (server == null) return null; -2191 Configuration conf = server.getConfiguration(); -2192 long keyUpdateInterval = -2193 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000); -2194 long maxAge = -2195 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000); -2196 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), -2197 server.getServerName().toString(), keyUpdateInterval, maxAge); -2198 } -2199 -2200 public SecretManager<? extends TokenIdentifier> getSecretManager() { -2201 return this.secretManager; -2202 } -2203 -2204 @SuppressWarnings("unchecked") -2205 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { -2206 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; -2207 } -2208 -2209 /** -2210 * This is a server side method, which is invoked over RPC. On success -2211 * the return response has protobuf response payload. On failure, the -2212 * exception name and the stack trace are returned in the protobuf response. -2213 */ -2214 @Override -2215 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, -2216 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) -2217 throws IOException { -2218 try { -2219 status.setRPC(md.getName(), new Object[]{param}, receiveTime); -2220 // TODO: Review after we add in encoded data blocks. -2221 status.setRPCPacket(param); -2222 status.resume("Servicing call"); -2223 //get an instance of the method arg type -2224 long startTime = System.currentTimeMillis(); -2225 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); -2226 Message result = service.callBlockingMethod(md, controller, param); -2227 long endTime = System.currentTimeMillis(); -2228 int processingTime = (int) (endTime - startTime); -2229 int qTime = (int) (startTime - receiveTime); -2230 int totalTime = (int) (endTime - receiveTime); -2231 if (LOG.isTraceEnabled()) { -2232 LOG.trace(CurCall.get().toString() + -2233 ", response " + TextFormat.shortDebugString(result) + -2234 " queueTime: " + qTime + -2235 " processingTime: " + processingTime + -2236 " totalTime: " + totalTime); -2237 } -2238 long requestSize = param.getSerializedSize(); -2239 long responseSize = result.getSerializedSize(); -2240 metrics.dequeuedCall(qTime); -2241 metrics.processedCall(processingTime); -2242 metrics.totalCall(totalTime); -2243 metrics.receivedRequest(requestSize); -2244 metrics.sentResponse(responseSize); -2245 // log any RPC responses that are slower than the configured warn -2246 // response time or larger than configured warning size -2247 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1); -2248 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1); -2249 if (tooSlow || tooLarge) { -2250 // when tagging, we let TooLarge trump TooSmall to keep output simple -2251 // note that large responses will often also be slow. -2252 logResponse(new Object[]{param}, -2253 md.getName(), md.getName() + "(" + param.getClass().getName() + ")", -2254 (tooLarge ? "TooLarge" : "TooSlow"), -2255 status.getClient(), startTime, processingTime, qTime, -2256 responseSize); -2257 } -2258 return new Pair<Message, CellScanner>(result, controller.cellScanner()); -2259 } catch (Throwable e) { -2260 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before -2261 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't -2262 // need to pass it over the wire. -2263 if (e instanceof ServiceException) e = e.getCause(); -2264 -2265 // increment the number of requests that were exceptions. -2266 metrics.exception(e); +2160 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } +2161 +2162 @Override +2163 public boolean isStarted() { +2164 return this.started; +2165 } +2166 +2167 /** Starts the service. Must be called before any calls will be handled. */ +2168 @Override +2169 public synchronized void start() { +2170 if (started) return; +2171 authTokenSecretMgr = createSecretManager(); +2172 if (authTokenSecretMgr != null) { +2173 setSecretManager(authTokenSecretMgr); +2174 authTokenSecretMgr.start(); +2175 } +2176 this.authManager = new ServiceAuthorizationManager(); +2177 HBasePolicyProvider.init(conf, authManager); +2178 responder.start(); +2179 listener.start(); +2180 scheduler.start(); +2181 started = true; +2182 } +2183 +2184 @Override +2185 public synchronized void refreshAuthManager(PolicyProvider pp) { +2186 // Ignore warnings that this should be accessed in a static way instead of via an instance; +2187 // it'll break if you go via static route. +2188 this.authManager.refresh(this.conf, pp); +2189 } +2190 +2191 private AuthenticationTokenSecretManager createSecretManager() { +2192 if (!isSecurityEnabled) return null; +2193 if (server == null) return null; +2194 Configuration conf = server.getConfiguration(); +2195 long keyUpdateInterval = +2196 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000); +2197 long maxAge = +2198 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000); +2199 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), +2200 server.getServerName().toString(), keyUpdateInterval, maxAge); +2201 } +2202 +2203 public SecretManager<? extends TokenIdentifier> getSecretManager() { +2204 return this.secretManager; +2205 } +2206 +2207 @SuppressWarnings("unchecked") +2208 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { +2209 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; +2210 } +2211 +2212 /** +2213 * This is a server side method, which is invoked over RPC. On success +2214 * the return response has protobuf response payload. On failure, the +2215 * exception name and the stack trace are returned in the protobuf response. +2216 */ +2217 @Override +2218 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, +2219 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) +2220 throws IOException { +2221 try { +2222 status.setRPC(md.getName(), new Object[]{param}, receiveTime); +2223 // TODO: Review after we add in encoded data blocks. +2224 status.setRPCPacket(param); +2225 status.resume("Servicing call"); +2226 //get an instance of the method arg type +2227 long startTime = System.currentTimeMillis(); +2228 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); +2229 Message result = service.callBlockingMethod(md, controller, param); +2230 long endTime = System.currentTimeMillis(); +2231 int processingTime = (int) (endTime - startTime); +2232 int qTime = (int) (startTime - receiveTime); +2233 int totalTime = (int) (endTime - receiveTime); +2234 if (LOG.isTraceEnabled()) { +2235 LOG.trace(CurCall.get().toString() + +2236 ", response " + TextFormat.shortDebugString(result) + +2237 " queueTime: " + qTime + +2238 " processingTime: " + processingTime + +2239 " totalTime: " + totalTime); +2240 } +2241 long requestSize = param.getSerializedSize(); +2242 long responseSize = result.getSerializedSize(); +2243 metrics.dequeuedCall(qTime); +2244 metrics.processedCall(processingTime); +2245 metrics.totalCall(totalTime); +2246 metrics.receivedRequest(requestSize); +2247 metrics.sentResponse(responseSize); +2248 // log any RPC responses that are slower than the configured warn +2249 // response time or larger than configured warning size +2250 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1); +2251 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1); +2252 if (tooSlow || tooLarge) { +2253 // when tagging, we let TooLarge trump TooSmall to keep output simple +2254 // note that large responses will often also be slow. +2255 logResponse(new Object[]{param}, +2256 md.getName(), md.getName() + "(" + param.getClass().getName() + ")", +2257 (tooLarge ? "TooLarge" : "TooSlow"), +2258 status.getClient(), startTime, processingTime, qTime, +2259 responseSize); +2260 } +2261 return new Pair<Message, CellScanner>(result, controller.cellScanner()); +2262 } catch (Throwable e) { +2263 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before +2264 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't +2265 // need to pass it over the wire. +2266 if (e instanceof ServiceException) e = e.getCause(); 2267 -2268 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); -2269 if (e instanceof IOException) throw (IOException)e; -2270 LOG.error("Unexpected throwable object ", e); -2271 throw new IOException(e.getMessage(), e); -2272 } -2273 } -2274 -2275 /** -2276 * Logs an RPC response to the LOG file, producing valid JSON objects for -2277 * client Operations. -2278 * @param params The parameters received in the call. -2279 * @param methodName The name of the method invoked -2280 * @param call The string representation of the call -2281 * @param tag The tag that will be used to indicate this event in the log. -2282 * @param clientAddress The address of the client who made this call. -2283 * @param startTime The time that the call was initiated, in ms. -2284 * @param processingTime The duration that the call took to run, in ms. -2285 * @param qTime The duration that the call spent on the queue -2286 * prior to being initiated, in ms. -2287 * @param responseSize The size in bytes of the response buffer. -2288 */ -2289 void logResponse(Object[] params, String methodName, String call, String tag, -2290 String clientAddress, long startTime, int processingTime, int qTime, -2291 long responseSize) -2292 throws IOException { -2293 // base information that is reported regardless of type of call -2294 Map<String, Object> responseInfo = new HashMap<String, Object>(); -2295 responseInfo.put("starttimems", startTime); -2296 responseInfo.put("processingtimems", processingTime); -2297 responseInfo.put("queuetimems", qTime); -2298 responseInfo.put("responsesize", responseSize); -2299 responseInfo.put("client", clientAddress); -2300 responseInfo.put("class", server == null? "": server.getClass().getSimpleName()); -2301 responseInfo.put("method", methodName); -2302 if (params.length == 2 && server instanceof HRegionServer && -2303 params[0] instanceof byte[] && -2304 params[1] instanceof Operation) { -2305 // if the slow process is a query, we want to log its table as well -2306 // as its own fingerprint -2307 TableName tableName = TableName.valueOf( -2308 HRegionInfo.parseRegionName((byte[]) params[0])[0]); -2309 responseInfo.put("table", tableName.getNameAsString()); -2310 // annotate the response map with operation details -2311 responseInfo.putAll(((Operation) params[1]).toMap()); -2312 // report to the log file -2313 LOG.warn("(operation" + tag + "): " + -2314 MAPPER.writeValueAsString(responseInfo)); -2315 } else if (params.length == 1 && server instanceof HRegionServer && -2316 params[0] instanceof Operation) { -2317 // annotate the response map with operation details -2318 responseInfo.putAll(((Operation) params[0]).toMap()); -2319 // report to the log file -2320 LOG.warn("(operation" + tag + "): " + -2321 MAPPER.writeValueAsString(responseInfo)); -2322 } else { -2323 // can't get JSON details, so just report call.toString() along with -2324 // a more generic tag. -2325 responseInfo.put("call", call); -2326 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo)); -2327 } -2328 } -2329 -2330 /** Stops the service. No new calls will be handled after this is called. */ -2331 @Override -2332 public synchronized void stop() { -2333 LOG.info("Stopping server on " + port); -2334 running = false; -2335 if (authTokenSecretMgr != null) { -2336 authTokenSecretMgr.stop(); -2337 authTokenSecretMgr = null; -2338 } -2339 listener.interrupt(); -2340 listener.doStop(); -2341 responder.interrupt(); -2342 scheduler.stop(); -2343 notifyAll(); -2344 } -2345 -2346 /** Wait for the server to be stopped. -2347 * Does not wait for all subthreads to finish. -2348 * See {@link #stop()}. -2349 * @throws InterruptedException e -2350 */ -2351 @Override -2352 public synchronized void join() throws InterruptedException { -2353 while (running) { -2354 wait(); -2355 } -2356 } -2357 -2358 /** -2359 * Return the socket (ip+port) on which the RPC server is listening to. May return null if -2360 * the listener channel is closed. -2361 * @return the socket (ip+port) on which the RPC server is listening to, or null if this -2362 * information cannot be determined -2363 */ -2364 @Override -2365 public synchronized InetSocketAddress getListenerAddress() { -2366 if (listener == null) { -2367 return null; -2368 } -2369 return listener.getAddress(); -2370 } -2371 -2372 /** -2373 * Set the handler for calling out of RPC for error conditions. -2374 * @param handler the handler implementation -2375 */ -2376 @Override -2377 public void setErrorHandler(HBaseRPCErrorHandler handler) { -2378 this.errorHandler = handler; -2379 } -2380 -2381 @Override -2382 public HBaseRPCErrorHandler getErrorHandler() { -2383 return this.errorHandler; -2384 } -2385 -2386 /** -2387 * Returns the metrics instance for reporting RPC call statistics -2388 */ -2389 @Override -2390 public MetricsHBaseServer getMetrics() { -2391 return metrics; -2392 } -2393 -2394 @Override -2395 public void addCallSize(final long diff) { -2396 this.callQueueSize.add(diff); -2397 } -2398 -2399 /** -2400 * Authorize the incoming client connection. -2401 * -2402 * @param user client user -2403 * @param connection incoming connection -2404 * @param addr InetAddress of incoming connection -2405 * @throws org.apache.hadoop.security.authorize.AuthorizationException -2406 * when the client isn't authorized to talk the protocol -2407 */ -2408 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection, -2409 InetAddress addr) -2410 throws AuthorizationException { -2411 if (authorize) { -2412 Class<?> c = getServiceInterface(services, connection.getServiceName()); -2413 this.authManager.authorize(user != null ? user : null, c, getConf(), addr); -2414 } -2415 } -2416 -2417 /** -2418 * When the read or write buffer size is larger than this limit, i/o will be -2419 * done in chunks of this size. Most RPC requests and responses would be -2420 * be smaller. -2421 */ -2422 private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB. -2423 -2424 /** -2425 * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. -2426 * If the amount of data is large, it writes to channel in smaller chunks. -2427 * This is to avoid jdk from creating many direct buffers as the size of -2428 * buffer increases. This also minimizes extra copies in NIO layer -2429 * as a result of multiple write operations required to write a large -2430 * buffer. -2431 * -2432 * @param channel writable byte channel to write to -2433 * @param bufferChain Chain of buffers to write -2434 * @return number of bytes written -2435 * @throws java.io.IOException e -2436 * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) -2437 */ -2438 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) -2439 throws IOException { -2440 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); -2441 if (count > 0) this.metrics.sentBytes(count); -2442 return count; -2443 } -2444 -2445 /** -2446 * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. -2447 * If the amount of data is large, it writes to channel in smaller chunks. -2448 * This is to avoid jdk from creating many direct buffers as the size of -2449 * ByteBuffer increases. There should not be any performance degredation. -2450 * -2451 * @param channel writable byte channel to write on -2452 * @param buffer buffer to write -2453 * @return number of bytes written -2454 * @throws java.io.IOException e -2455 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer) -2456 */ -2457 protected int channelRead(ReadableByteChannel channel, -2458 ByteBuffer buffer) throws IOException { -2459 -2460 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? -2461 channel.read(buffer) : channelIO(channel, null, buffer); -2462 if (count > 0) { -2463 metrics.receivedBytes(count); -2464 } -2465 return count; -2466 } -2467 -2468 /** -2469 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)} -2470 * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only -2471 * one of readCh or writeCh should be non-null. -2472 * -2473 * @param readCh read channel -2474 * @param writeCh write channel -2475 * @param buf buffer to read or write into/out of -2476 * @return bytes written -2477 * @throws java.io.IOException e -2478 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) -2479 * @see #channelWrite(GatheringByteChannel, BufferChain) -2480 */ -2481 private static int channelIO(ReadableByteChannel readCh, -2482 WritableByteChannel writeCh, -2483 ByteBuffer buf) throws IOException { -2484 -2485 int originalLimit = buf.limit(); -2486 int initialRemaining = buf.remaining(); -2487 int ret = 0; -2488 -2489 while (buf.remaining() > 0) { -2490 try { -2491 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); -2492 buf.limit(buf.position() + ioSize); -2493 -2494 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); -2495 -2496 if (ret < ioSize) { -2497 break; -2498 } -2499 -2500 } finally { -2501 buf.limit(originalLimit); -2502 } -2503 } -2504 -2505 int nBytes = initialRemaining - buf.remaining(); -2506 return (nBytes > 0) ? nBytes : ret; -2507 } -2508 -2509 /** -2510 * Needed for features such as delayed calls. We need to be able to store the current call -2511 * so that we can complete it later or ask questions of what is supported by the current ongoing -2512 * call. -2513 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) -2514 */ -2515 public static RpcCallContext getCurrentCall() { -2516 return CurCall.get(); -2517 } -2518 -2519 public static boolean isInRpcCallContext() { -2520 return CurCall.get() != null; -2521 } -2522 -2523 /** -2524 * Returns the user credentials associated with the current RPC request or -2525 * <code>null</code> if no credentials were provided. -2526 * @return A User -2527 */ -2528 public static User getRequestUser() { -2529 RpcCallContext ctx = getCurrentCall(); -2530 return ctx == null? null: ctx.getRequestUser(); -2531 } -2532 -2533 /** -2534 * Returns the username for any user associated with the current RPC -2535 * request or <code>null</code> if no user is set. -2536 */ -2537 public static String getRequestUserName() { -2538 User user = getRequestUser(); -2539 return user == null? null: user.getShortName(); -2540 } -2541 -2542 /** -2543 * @return Address of remote client if a request is ongoing, else null -2544 */ -2545 public static InetAddress getRemoteAddress() { -2546 RpcCallContext ctx = getCurrentCall(); -2547 return ctx == null? null: ctx.getRemoteAddress(); -2548 } -2549 -2550 /** -2551 * @param serviceName Some arbitrary string that represents a 'service'. -2552 * @param services Available service instances -2553 * @return Matching BlockingServiceAndInterface pair -2554 */ -2555 static BlockingServiceAndInterface getServiceAndInterface( -2556 final List<BlockingServiceAndInterface> services, final String serviceName) { -2557 for (BlockingServiceAndInterface bs : services) { -2558 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) { -2559 return bs; -2560 } -2561 } -2562 return null; -2563 } -2564 -2565 /** -2566 * @param serviceName Some arbitrary string that represents a 'service'. -2567 * @param services Available services and their service interfaces. -2568 * @return Service interface class for <code>serviceName</code> -2569 */ -2570 static Class<?> getServiceInterface( -2571 final List<BlockingServiceAndInterface> services, -2572 final String serviceName) { -2573 BlockingServiceAndInterface bsasi = -2574 getServiceAndInterface(services, serviceName); -2575 return bsasi == null? null: bsasi.getServiceInterface(); -2576 } -2577 -2578 /** -2579 * @param serviceName Some arbitrary string that represents a 'service'. -2580 * @param services Available services and their service interfaces. -2581 * @return BlockingService that goes with the passed <code>serviceName</code> -2582 */ -2583 static BlockingService getService( -2584 final List<BlockingServiceAndInterface> services, -2585 final String serviceName) { -2586 BlockingServiceAndInterface bsasi = -2587 getServiceAndInterface(services, serviceName); -2588 return bsasi == null? null: bsasi.getBlockingService(); -2589 } -2590 -2591 static MonitoredRPCHandler getStatus() { -2592 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. -2593 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); -2594 if (status != null) { -2595 return status; -2596 } -2597 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); -2598 status.pause("Waiting for a call"); -2599 RpcServer.MONITORED_RPC.set(status); -2600 return status; -2601 } -2602 -2603 /** Returns the remote side ip address when invoked inside an RPC -2604 * Returns null incase of an error. -2605 * @return InetAddress -2606 */ -2607 public static InetAddress getRemoteIp() { -2608 Call call = CurCall.get(); -2609 if (call != null && call.connection != null && call.connection.socket != null) { -2610 return call.connection.socket.getInetAddress(); -2611 } -2612 return null; -2613 } -2614 -2615 -2616 /** -2617 * A convenience method to bind to a given address and report -2618 * better exceptions if the address is not a valid host. -2619 * @param socket the socket to bind -2620 * @param address the address to bind to -2621 * @param backlog the number of connections allowed in the queue -2622 * @throws BindException if the address can't be bound -2623 * @throws UnknownHostException if the address isn't a valid host name -2624 * @throws IOException other random errors from bind -2625 */ -2626 public static void bind(ServerSocket socket, InetSocketAddress address, -2627 int backlog) throws IOException { -2628 try { -2629 socket.bind(address, backlog); -2630 } catch (BindException e) { -2631 BindException bindException = -2632 new BindException("Problem binding to " + address + " : " + -2633 e.getMessage()); -2634 bindException.initCause(e); -2635 throw bindException; -2636 } catch (SocketException e) { -2637 // If they try to bind to a different host's address, give a better -2638 // error message. -2639 if ("Unresolved address".equals(e.getMessage())) { -2640 throw new UnknownHostException("Invalid hostname for server: " + -2641 address.getHostName()); -2642 } -2643 throw e; -2644 } -2645 } -2646 -2647 @Override -2648 public RpcScheduler getScheduler() { -2649 return scheduler; -2650 } -2651} +2268 // increment the number of requests that were exceptions. +2269 metrics.exception(e); +2270 +2271 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); +2272 if (e instanceof IOException) throw (IOException)e; +2273 LOG.error("Unexpected throwable object ", e); +2274 throw new IOException(e.getMessage(), e); +2275 } +2276 } +2277 +2278 /** +2279 * Logs an RPC response to the LOG file, producing valid JSON objects for +2280 * client Operations. +2281 * @param params The parameters received in the call. +2282 * @param methodName The name of the method invoked +2283 * @param call The string representation of the call +2284 * @param tag The tag that will be used to indicate this event in the log. +2285 * @param clientAddress The address of the client who made this call. +2286 * @param startTime The time that the call was initiated, in ms. +2287 * @param processingTime The duration that the call took to run, in ms. +2288 * @param qTime The duration that the call spent on the queue +2289 * prior to being initiated, in ms. +2290 * @param responseSize The size in bytes of the response buffer. +2291 */ +2292 void logResponse(Object[] params, String methodName, String call, String tag, +2293 String clientAddress, long startTime, int processingTime, int qTime, +2294 long responseSize) +2295 throws IOException { +2296 // base information that is reported regardless of type of call +2297 Map<String, Object> responseInfo = new HashMap<String, Object>(); +2298 responseInfo.put("starttimems", startTime); +2299 responseInfo.put("processingtimems", processingTime); +2300 responseInfo.put("queuetimems", qTime); +2301 responseInfo.put("responsesize", responseSize); +2302 responseInfo.put("client", clientAddress); +2303 responseInfo.put("class", server == null? "": server.getClass().getSimpleName()); +2304 responseInfo.put("method", methodName); +2305 if (params.length == 2 && server instanceof HRegionServer && +2306 params[0] instanceof byte[] && +2307 params[1] instanceof Operation) { +2308 // if the slow process is a query, we want to log its table as well +2309 // as its own fingerprint +2310 TableName tableName = TableName.valueOf( +2311 HRegionInfo.parseRegionName((byte[]) params[0])[0]); +2312 responseInfo.put("table", tableName.getNameAsString()); +2313 // annotate the response map with operation details +2314 responseInfo.putAll(((Operation) params[1]).toMap()); +2315 // report to the log file +2316 LOG.warn("(operation" + tag + "): " + +2317 MAPPER.writeValueAsString(responseInfo)); +2318 } else if (params.length == 1 && server instanceof HRegionServer && +2319 params[0] instanceof Operation) { +2320 // annotate the response map with operation details +2321 responseInfo.putAll(((Operation) params[0]).toMap()); +2322 // report to the log file +2323 LOG.warn("(operation" + tag + "): " + +2324 MAPPER.writeValueAsString(responseInfo)); +2325 } else { +2326 // can't get JSON details, so just report call.toString() along with +2327 // a more generic tag. +2328 responseInfo.put("call", call); +2329 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo)); +2330 } +2331 } +2332 +2333 /** Stops the service. No new calls will be handled after this is called. */ +2334 @Override +2335 public synchronized void stop() { +2336 LOG.info("Stopping server on " + port); +2337 running = false; +2338 if (authTokenSecretMgr != null) { +2339 authTokenSecretMgr.stop(); +2340 authTokenSecretMgr = null; +2341 } +2342 listener.interrupt(); +2343 listener.doStop(); +2344 responder.interrupt(); +2345 scheduler.stop(); +2346 notifyAll(); +2347 } +2348 +2349 /** Wait for the server to be stopped. +2350 * Does not wait for all subthreads to finish. +2351 * See {@link #stop()}. +2352 * @throws InterruptedException e +2353 */ +2354 @Override +2355 public synchronized void join() throws InterruptedException { +2356 while (running) { +2357 wait(); +2358 } +2359 } +2360 +2361 /** +2362 * Return the socket (ip+port) on which the RPC server is listening to. May return null if +2363 * the listener channel is closed. +2364 * @return the socket (ip+port) on which the RPC server is listening to, or null if this +2365 * information cannot be determined +2366 */ +2367 @Override +2368 public synchronized InetSocketAddress getListenerAddress() { +2369 if (listener == null) { +2370 return null; +2371 } +2372 return listener.getAddress(); +2373 } +2374 +2375 /** +2376 * Set the handler for calling out of RPC for error conditions. +2377 * @param handler the handler implementation +2378 */ +2379 @Override +2380 public void setErrorHandler(HBaseRPCErrorHandler handler) { +2381 this.errorHandler = handler; +2382 } +2383 +2384 @Override +2385 public HBaseRPCErrorHandler getErrorHandler() { +2386 return this.errorHandler; +2387 } +2388 +2389 /** +2390 * Returns the metrics instance for reporting RPC call statistics +2391 */ +2392 @Override +2393 public MetricsHBaseServer getMetrics() { +2394 return metrics; +2395 } +2396 +2397 @Override +2398 public void addCallSize(final long diff) { +2399 this.callQueueSize.add(diff); +2400 } +2401 +2402 /** +2403 * Authorize the incoming client connection. +2404 * +2405 * @param user client user +2406 * @param connection incoming connection +2407 * @param addr InetAddress of incoming connection +2408 * @throws org.apache.hadoop.security.authorize.AuthorizationException +2409 * when the client isn't authorized to talk the protocol +2410 */ +2411 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection, +2412 InetAddress addr) +2413 throws AuthorizationException { +2414 if (authorize) { +2415 Class<?> c = getServiceInterface(services, connection.getServiceName()); +2416 this.authManager.authorize(user != null ? user : null, c, getConf(), addr); +2417 } +2418 } +2419 +2420 /** +2421 * When the read or write buffer size is larger than this limit, i/o will be +2422 * done in chunks of this size. Most RPC requests and responses would be +2423 * be smaller. +2424 */ +2425 private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB. +2426 +2427 /** +2428 * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.