Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7174418D51 for ; Thu, 11 Feb 2016 17:34:16 +0000 (UTC) Received: (qmail 26893 invoked by uid 500); 11 Feb 2016 17:34:11 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 26821 invoked by uid 500); 11 Feb 2016 17:34:11 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 23545 invoked by uid 99); 11 Feb 2016 17:34:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2016 17:34:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5D6CFE6996; Thu, 11 Feb 2016 17:34:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Thu, 11 Feb 2016 17:34:43 -0000 Message-Id: In-Reply-To: <33ea69418bfa4b81897bd860a19e4c52@git.apache.org> References: <33ea69418bfa4b81897bd860a19e4c52@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [37/51] [partial] hbase-site git commit: Published site at 29a192ef3cbe3b9cc12a6ee38f39e1199ac9790f. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/8bb348c6/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.BlockingRpcChannelImplementation.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.BlockingRpcChannelImplementation.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.BlockingRpcChannelImplementation.html index f9da92e..d77d3ab 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.BlockingRpcChannelImplementation.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.BlockingRpcChannelImplementation.html @@ -157,184 +157,189 @@ 149 } 150 } 151 -152 /** -153 * Encapsulate the ugly casting and RuntimeException conversion in private method. -154 * @param conf configuration -155 * @return The compressor to use on this client. -156 */ -157 private static CompressionCodec getCompressor(final Configuration conf) { -158 String className = conf.get("hbase.client.rpc.compressor", null); -159 if (className == null || className.isEmpty()) return null; -160 try { -161 return (CompressionCodec)Class.forName(className).newInstance(); -162 } catch (Exception e) { -163 throw new RuntimeException("Failed getting compressor " + className, e); -164 } -165 } -166 -167 /** -168 * Return the pool type specified in the configuration, which must be set to -169 * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or -170 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, -171 * otherwise default to the former. -172 * -173 * For applications with many user threads, use a small round-robin pool. For -174 * applications with few user threads, you may want to try using a -175 * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient} -176 * instances should not exceed the operating system's hard limit on the number of -177 * connections. -178 * -179 * @param config configuration -180 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or -181 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} -182 */ -183 protected static PoolMap.PoolType getPoolType(Configuration config) { -184 return PoolMap.PoolType -185 .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin, -186 PoolMap.PoolType.ThreadLocal); -187 } -188 -189 /** -190 * Return the pool size specified in the configuration, which is applicable only if -191 * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. -192 * -193 * @param config configuration -194 * @return the maximum pool size -195 */ -196 protected static int getPoolSize(Configuration config) { -197 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); -198 } -199 -200 /** -201 * Make a blocking call. Throws exceptions if there are network problems or if the remote code -202 * threw an exception. -203 * -204 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. -205 * {@link UserProvider#getCurrent()} makes a new instance of User each time so -206 * will be a -207 * new Connection each time. -208 * @return A pair with the Message response and the Cell data (if any). -209 */ -210 Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, -211 Message param, Message returnType, final User ticket, final InetSocketAddress isa) -212 throws ServiceException { -213 if (pcrc == null) { -214 pcrc = new PayloadCarryingRpcController(); -215 } -216 -217 Pair<Message, CellScanner> val; -218 try { -219 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); -220 cs.setStartTime(EnvironmentEdgeManager.currentTime()); -221 val = call(pcrc, md, param, returnType, ticket, isa, cs); -222 // Shove the results into controller so can be carried across the proxy/pb service void. -223 pcrc.setCellScanner(val.getSecond()); -224 -225 cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); -226 if (metrics != null) { -227 metrics.updateRpc(md, param, cs); -228 } -229 if (LOG.isTraceEnabled()) { -230 LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); -231 } -232 return val.getFirst(); -233 } catch (Throwable e) { -234 throw new ServiceException(e); -235 } -236 } -237 -238 /** -239 * Make a call, passing <code>param</code>, to the IPC server running at -240 * <code>address</code> which is servicing the <code>protocol</code> protocol, -241 * with the <code>ticket</code> credentials, returning the value. -242 * Throws exceptions if there are network problems or if the remote code -243 * threw an exception. -244 * -245 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. -246 * {@link UserProvider#getCurrent()} makes a new instance of User each time so -247 * will be a -248 * new Connection each time. -249 * @return A pair with the Message response and the Cell data (if any). -250 * @throws InterruptedException -251 * @throws java.io.IOException -252 */ -253 protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, -254 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, -255 InetSocketAddress isa, MetricsConnection.CallStats callStats) -256 throws IOException, InterruptedException; -257 -258 @Override -259 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, -260 int defaultOperationTimeout) { -261 return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); -262 } -263 -264 /** -265 * Takes an Exception and the address we were trying to connect to and return an IOException with -266 * the input exception as the cause. The new exception provides the stack trace of the place where -267 * the exception is thrown and some extra diagnostics information. If the exception is -268 * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return -269 * an IOException. -270 * @param addr target address -271 * @param exception the relevant exception -272 * @return an exception to throw -273 */ -274 protected IOException wrapException(InetSocketAddress addr, Exception exception) { -275 if (exception instanceof ConnectException) { -276 // connection refused; include the host:port in the error -277 return (ConnectException) new ConnectException("Call to " + addr -278 + " failed on connection exception: " + exception).initCause(exception); -279 } else if (exception instanceof SocketTimeoutException) { -280 return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr -281 + " failed because " + exception).initCause(exception); -282 } else if (exception instanceof ConnectionClosingException) { -283 return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr -284 + " failed on local exception: " + exception).initCause(exception); -285 } else { -286 return (IOException) new IOException("Call to " + addr + " failed on local exception: " -287 + exception).initCause(exception); -288 } -289 } -290 -291 /** -292 * Blocking rpc channel that goes via hbase rpc. -293 */ -294 @VisibleForTesting -295 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { -296 private final InetSocketAddress isa; -297 private final AbstractRpcClient rpcClient; -298 private final User ticket; -299 private final int channelOperationTimeout; -300 -301 /** -302 * @param channelOperationTimeout - the default timeout when no timeout is given -303 */ -304 protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient, -305 final ServerName sn, final User ticket, int channelOperationTimeout) { -306 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); -307 this.rpcClient = rpcClient; -308 this.ticket = ticket; -309 this.channelOperationTimeout = channelOperationTimeout; -310 } -311 -312 @Override -313 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, -314 Message param, Message returnType) throws ServiceException { -315 PayloadCarryingRpcController pcrc; -316 if (controller != null) { -317 pcrc = (PayloadCarryingRpcController) controller; -318 if (!pcrc.hasCallTimeout()) { -319 pcrc.setCallTimeout(channelOperationTimeout); -320 } -321 } else { -322 pcrc = new PayloadCarryingRpcController(); -323 pcrc.setCallTimeout(channelOperationTimeout); -324 } -325 -326 return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); -327 } -328 } -329} +152 @Override +153 public boolean hasCellBlockSupport() { +154 return this.codec != null; +155 } +156 +157 /** +158 * Encapsulate the ugly casting and RuntimeException conversion in private method. +159 * @param conf configuration +160 * @return The compressor to use on this client. +161 */ +162 private static CompressionCodec getCompressor(final Configuration conf) { +163 String className = conf.get("hbase.client.rpc.compressor", null); +164 if (className == null || className.isEmpty()) return null; +165 try { +166 return (CompressionCodec)Class.forName(className).newInstance(); +167 } catch (Exception e) { +168 throw new RuntimeException("Failed getting compressor " + className, e); +169 } +170 } +171 +172 /** +173 * Return the pool type specified in the configuration, which must be set to +174 * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or +175 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, +176 * otherwise default to the former. +177 * +178 * For applications with many user threads, use a small round-robin pool. For +179 * applications with few user threads, you may want to try using a +180 * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient} +181 * instances should not exceed the operating system's hard limit on the number of +182 * connections. +183 * +184 * @param config configuration +185 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or +186 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} +187 */ +188 protected static PoolMap.PoolType getPoolType(Configuration config) { +189 return PoolMap.PoolType +190 .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin, +191 PoolMap.PoolType.ThreadLocal); +192 } +193 +194 /** +195 * Return the pool size specified in the configuration, which is applicable only if +196 * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. +197 * +198 * @param config configuration +199 * @return the maximum pool size +200 */ +201 protected static int getPoolSize(Configuration config) { +202 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); +203 } +204 +205 /** +206 * Make a blocking call. Throws exceptions if there are network problems or if the remote code +207 * threw an exception. +208 * +209 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. +210 * {@link UserProvider#getCurrent()} makes a new instance of User each time so +211 * will be a +212 * new Connection each time. +213 * @return A pair with the Message response and the Cell data (if any). +214 */ +215 Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, +216 Message param, Message returnType, final User ticket, final InetSocketAddress isa) +217 throws ServiceException { +218 if (pcrc == null) { +219 pcrc = new PayloadCarryingRpcController(); +220 } +221 +222 Pair<Message, CellScanner> val; +223 try { +224 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); +225 cs.setStartTime(EnvironmentEdgeManager.currentTime()); +226 val = call(pcrc, md, param, returnType, ticket, isa, cs); +227 // Shove the results into controller so can be carried across the proxy/pb service void. +228 pcrc.setCellScanner(val.getSecond()); +229 +230 cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); +231 if (metrics != null) { +232 metrics.updateRpc(md, param, cs); +233 } +234 if (LOG.isTraceEnabled()) { +235 LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); +236 } +237 return val.getFirst(); +238 } catch (Throwable e) { +239 throw new ServiceException(e); +240 } +241 } +242 +243 /** +244 * Make a call, passing <code>param</code>, to the IPC server running at +245 * <code>address</code> which is servicing the <code>protocol</code> protocol, +246 * with the <code>ticket</code> credentials, returning the value. +247 * Throws exceptions if there are network problems or if the remote code +248 * threw an exception. +249 * +250 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. +251 * {@link UserProvider#getCurrent()} makes a new instance of User each time so +252 * will be a +253 * new Connection each time. +254 * @return A pair with the Message response and the Cell data (if any). +255 * @throws InterruptedException +256 * @throws java.io.IOException +257 */ +258 protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, +259 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, +260 InetSocketAddress isa, MetricsConnection.CallStats callStats) +261 throws IOException, InterruptedException; +262 +263 @Override +264 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, +265 int defaultOperationTimeout) { +266 return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); +267 } +268 +269 /** +270 * Takes an Exception and the address we were trying to connect to and return an IOException with +271 * the input exception as the cause. The new exception provides the stack trace of the place where +272 * the exception is thrown and some extra diagnostics information. If the exception is +273 * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return +274 * an IOException. +275 * @param addr target address +276 * @param exception the relevant exception +277 * @return an exception to throw +278 */ +279 protected IOException wrapException(InetSocketAddress addr, Exception exception) { +280 if (exception instanceof ConnectException) { +281 // connection refused; include the host:port in the error +282 return (ConnectException) new ConnectException("Call to " + addr +283 + " failed on connection exception: " + exception).initCause(exception); +284 } else if (exception instanceof SocketTimeoutException) { +285 return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr +286 + " failed because " + exception).initCause(exception); +287 } else if (exception instanceof ConnectionClosingException) { +288 return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr +289 + " failed on local exception: " + exception).initCause(exception); +290 } else { +291 return (IOException) new IOException("Call to " + addr + " failed on local exception: " +292 + exception).initCause(exception); +293 } +294 } +295 +296 /** +297 * Blocking rpc channel that goes via hbase rpc. +298 */ +299 @VisibleForTesting +300 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { +301 private final InetSocketAddress isa; +302 private final AbstractRpcClient rpcClient; +303 private final User ticket; +304 private final int channelOperationTimeout; +305 +306 /** +307 * @param channelOperationTimeout - the default timeout when no timeout is given +308 */ +309 protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient, +310 final ServerName sn, final User ticket, int channelOperationTimeout) { +311 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); +312 this.rpcClient = rpcClient; +313 this.ticket = ticket; +314 this.channelOperationTimeout = channelOperationTimeout; +315 } +316 +317 @Override +318 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, +319 Message param, Message returnType) throws ServiceException { +320 PayloadCarryingRpcController pcrc; +321 if (controller != null) { +322 pcrc = (PayloadCarryingRpcController) controller; +323 if (!pcrc.hasCallTimeout()) { +324 pcrc.setCallTimeout(channelOperationTimeout); +325 } +326 } else { +327 pcrc = new PayloadCarryingRpcController(); +328 pcrc.setCallTimeout(channelOperationTimeout); +329 } +330 +331 return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); +332 } +333 } +334} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/8bb348c6/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.html index f9da92e..d77d3ab 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractRpcClient.html @@ -157,184 +157,189 @@ 149 } 150 } 151 -152 /** -153 * Encapsulate the ugly casting and RuntimeException conversion in private method. -154 * @param conf configuration -155 * @return The compressor to use on this client. -156 */ -157 private static CompressionCodec getCompressor(final Configuration conf) { -158 String className = conf.get("hbase.client.rpc.compressor", null); -159 if (className == null || className.isEmpty()) return null; -160 try { -161 return (CompressionCodec)Class.forName(className).newInstance(); -162 } catch (Exception e) { -163 throw new RuntimeException("Failed getting compressor " + className, e); -164 } -165 } -166 -167 /** -168 * Return the pool type specified in the configuration, which must be set to -169 * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or -170 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, -171 * otherwise default to the former. -172 * -173 * For applications with many user threads, use a small round-robin pool. For -174 * applications with few user threads, you may want to try using a -175 * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient} -176 * instances should not exceed the operating system's hard limit on the number of -177 * connections. -178 * -179 * @param config configuration -180 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or -181 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} -182 */ -183 protected static PoolMap.PoolType getPoolType(Configuration config) { -184 return PoolMap.PoolType -185 .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin, -186 PoolMap.PoolType.ThreadLocal); -187 } -188 -189 /** -190 * Return the pool size specified in the configuration, which is applicable only if -191 * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. -192 * -193 * @param config configuration -194 * @return the maximum pool size -195 */ -196 protected static int getPoolSize(Configuration config) { -197 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); -198 } -199 -200 /** -201 * Make a blocking call. Throws exceptions if there are network problems or if the remote code -202 * threw an exception. -203 * -204 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. -205 * {@link UserProvider#getCurrent()} makes a new instance of User each time so -206 * will be a -207 * new Connection each time. -208 * @return A pair with the Message response and the Cell data (if any). -209 */ -210 Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, -211 Message param, Message returnType, final User ticket, final InetSocketAddress isa) -212 throws ServiceException { -213 if (pcrc == null) { -214 pcrc = new PayloadCarryingRpcController(); -215 } -216 -217 Pair<Message, CellScanner> val; -218 try { -219 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); -220 cs.setStartTime(EnvironmentEdgeManager.currentTime()); -221 val = call(pcrc, md, param, returnType, ticket, isa, cs); -222 // Shove the results into controller so can be carried across the proxy/pb service void. -223 pcrc.setCellScanner(val.getSecond()); -224 -225 cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); -226 if (metrics != null) { -227 metrics.updateRpc(md, param, cs); -228 } -229 if (LOG.isTraceEnabled()) { -230 LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); -231 } -232 return val.getFirst(); -233 } catch (Throwable e) { -234 throw new ServiceException(e); -235 } -236 } -237 -238 /** -239 * Make a call, passing <code>param</code>, to the IPC server running at -240 * <code>address</code> which is servicing the <code>protocol</code> protocol, -241 * with the <code>ticket</code> credentials, returning the value. -242 * Throws exceptions if there are network problems or if the remote code -243 * threw an exception. -244 * -245 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. -246 * {@link UserProvider#getCurrent()} makes a new instance of User each time so -247 * will be a -248 * new Connection each time. -249 * @return A pair with the Message response and the Cell data (if any). -250 * @throws InterruptedException -251 * @throws java.io.IOException -252 */ -253 protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, -254 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, -255 InetSocketAddress isa, MetricsConnection.CallStats callStats) -256 throws IOException, InterruptedException; -257 -258 @Override -259 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, -260 int defaultOperationTimeout) { -261 return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); -262 } -263 -264 /** -265 * Takes an Exception and the address we were trying to connect to and return an IOException with -266 * the input exception as the cause. The new exception provides the stack trace of the place where -267 * the exception is thrown and some extra diagnostics information. If the exception is -268 * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return -269 * an IOException. -270 * @param addr target address -271 * @param exception the relevant exception -272 * @return an exception to throw -273 */ -274 protected IOException wrapException(InetSocketAddress addr, Exception exception) { -275 if (exception instanceof ConnectException) { -276 // connection refused; include the host:port in the error -277 return (ConnectException) new ConnectException("Call to " + addr -278 + " failed on connection exception: " + exception).initCause(exception); -279 } else if (exception instanceof SocketTimeoutException) { -280 return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr -281 + " failed because " + exception).initCause(exception); -282 } else if (exception instanceof ConnectionClosingException) { -283 return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr -284 + " failed on local exception: " + exception).initCause(exception); -285 } else { -286 return (IOException) new IOException("Call to " + addr + " failed on local exception: " -287 + exception).initCause(exception); -288 } -289 } -290 -291 /** -292 * Blocking rpc channel that goes via hbase rpc. -293 */ -294 @VisibleForTesting -295 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { -296 private final InetSocketAddress isa; -297 private final AbstractRpcClient rpcClient; -298 private final User ticket; -299 private final int channelOperationTimeout; -300 -301 /** -302 * @param channelOperationTimeout - the default timeout when no timeout is given -303 */ -304 protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient, -305 final ServerName sn, final User ticket, int channelOperationTimeout) { -306 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); -307 this.rpcClient = rpcClient; -308 this.ticket = ticket; -309 this.channelOperationTimeout = channelOperationTimeout; -310 } -311 -312 @Override -313 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, -314 Message param, Message returnType) throws ServiceException { -315 PayloadCarryingRpcController pcrc; -316 if (controller != null) { -317 pcrc = (PayloadCarryingRpcController) controller; -318 if (!pcrc.hasCallTimeout()) { -319 pcrc.setCallTimeout(channelOperationTimeout); -320 } -321 } else { -322 pcrc = new PayloadCarryingRpcController(); -323 pcrc.setCallTimeout(channelOperationTimeout); -324 } -325 -326 return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); -327 } -328 } -329} +152 @Override +153 public boolean hasCellBlockSupport() { +154 return this.codec != null; +155 } +156 +157 /** +158 * Encapsulate the ugly casting and RuntimeException conversion in private method. +159 * @param conf configuration +160 * @return The compressor to use on this client. +161 */ +162 private static CompressionCodec getCompressor(final Configuration conf) { +163 String className = conf.get("hbase.client.rpc.compressor", null); +164 if (className == null || className.isEmpty()) return null; +165 try { +166 return (CompressionCodec)Class.forName(className).newInstance(); +167 } catch (Exception e) { +168 throw new RuntimeException("Failed getting compressor " + className, e); +169 } +170 } +171 +172 /** +173 * Return the pool type specified in the configuration, which must be set to +174 * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or +175 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, +176 * otherwise default to the former. +177 * +178 * For applications with many user threads, use a small round-robin pool. For +179 * applications with few user threads, you may want to try using a +180 * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient} +181 * instances should not exceed the operating system's hard limit on the number of +182 * connections. +183 * +184 * @param config configuration +185 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or +186 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} +187 */ +188 protected static PoolMap.PoolType getPoolType(Configuration config) { +189 return PoolMap.PoolType +190 .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin, +191 PoolMap.PoolType.ThreadLocal); +192 } +193 +194 /** +195 * Return the pool size specified in the configuration, which is applicable only if +196 * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. +197 * +198 * @param config configuration +199 * @return the maximum pool size +200 */ +201 protected static int getPoolSize(Configuration config) { +202 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); +203 } +204 +205 /** +206 * Make a blocking call. Throws exceptions if there are network problems or if the remote code +207 * threw an exception. +208 * +209 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. +210 * {@link UserProvider#getCurrent()} makes a new instance of User each time so +211 * will be a +212 * new Connection each time. +213 * @return A pair with the Message response and the Cell data (if any). +214 */ +215 Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, +216 Message param, Message returnType, final User ticket, final InetSocketAddress isa) +217 throws ServiceException { +218 if (pcrc == null) { +219 pcrc = new PayloadCarryingRpcController(); +220 } +221 +222 Pair<Message, CellScanner> val; +223 try { +224 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); +225 cs.setStartTime(EnvironmentEdgeManager.currentTime()); +226 val = call(pcrc, md, param, returnType, ticket, isa, cs); +227 // Shove the results into controller so can be carried across the proxy/pb service void. +228 pcrc.setCellScanner(val.getSecond()); +229 +230 cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); +231 if (metrics != null) { +232 metrics.updateRpc(md, param, cs); +233 } +234 if (LOG.isTraceEnabled()) { +235 LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); +236 } +237 return val.getFirst(); +238 } catch (Throwable e) { +239 throw new ServiceException(e); +240 } +241 } +242 +243 /** +244 * Make a call, passing <code>param</code>, to the IPC server running at +245 * <code>address</code> which is servicing the <code>protocol</code> protocol, +246 * with the <code>ticket</code> credentials, returning the value. +247 * Throws exceptions if there are network problems or if the remote code +248 * threw an exception. +249 * +250 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. +251 * {@link UserProvider#getCurrent()} makes a new instance of User each time so +252 * will be a +253 * new Connection each time. +254 * @return A pair with the Message response and the Cell data (if any). +255 * @throws InterruptedException +256 * @throws java.io.IOException +257 */ +258 protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, +259 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, +260 InetSocketAddress isa, MetricsConnection.CallStats callStats) +261 throws IOException, InterruptedException; +262 +263 @Override +264 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, +265 int defaultOperationTimeout) { +266 return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); +267 } +268 +269 /** +270 * Takes an Exception and the address we were trying to connect to and return an IOException with +271 * the input exception as the cause. The new exception provides the stack trace of the place where +272 * the exception is thrown and some extra diagnostics information. If the exception is +273 * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return +274 * an IOException. +275 * @param addr target address +276 * @param exception the relevant exception +277 * @return an exception to throw +278 */ +279 protected IOException wrapException(InetSocketAddress addr, Exception exception) { +280 if (exception instanceof ConnectException) { +281 // connection refused; include the host:port in the error +282 return (ConnectException) new ConnectException("Call to " + addr +283 + " failed on connection exception: " + exception).initCause(exception); +284 } else if (exception instanceof SocketTimeoutException) { +285 return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr +286 + " failed because " + exception).initCause(exception); +287 } else if (exception instanceof ConnectionClosingException) { +288 return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr +289 + " failed on local exception: " + exception).initCause(exception); +290 } else { +291 return (IOException) new IOException("Call to " + addr + " failed on local exception: " +292 + exception).initCause(exception); +293 } +294 } +295 +296 /** +297 * Blocking rpc channel that goes via hbase rpc. +298 */ +299 @VisibleForTesting +300 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { +301 private final InetSocketAddress isa; +302 private final AbstractRpcClient rpcClient; +303 private final User ticket; +304 private final int channelOperationTimeout; +305 +306 /** +307 * @param channelOperationTimeout - the default timeout when no timeout is given +308 */ +309 protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient, +310 final ServerName sn, final User ticket, int channelOperationTimeout) { +311 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); +312 this.rpcClient = rpcClient; +313 this.ticket = ticket; +314 this.channelOperationTimeout = channelOperationTimeout; +315 } +316 +317 @Override +318 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, +319 Message param, Message returnType) throws ServiceException { +320 PayloadCarryingRpcController pcrc; +321 if (controller != null) { +322 pcrc = (PayloadCarryingRpcController) controller; +323 if (!pcrc.hasCallTimeout()) { +324 pcrc.setCallTimeout(channelOperationTimeout); +325 } +326 } else { +327 pcrc = new PayloadCarryingRpcController(); +328 pcrc.setCallTimeout(channelOperationTimeout); +329 } +330 +331 return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); +332 } +333 } +334} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/8bb348c6/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClient.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClient.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClient.html index 7d4db73..016609c 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClient.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClient.html @@ -91,7 +91,13 @@ 083 * using this client. 084 */ 085 @Override public void close(); -086} +086 +087 /** +088 * @return true when this client uses a {@link org.apache.hadoop.hbase.codec.Codec} and so +089 * supports cell blocks. +090 */ +091 boolean hasCellBlockSupport(); +092} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/8bb348c6/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html index 5344a96..6c2b2de 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html @@ -274,7 +274,7 @@ 266 } 267 List<InputSplit> splits = new ArrayList<InputSplit>(1); 268 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); -269 TableSplit split = new TableSplit(tableName, +269 TableSplit split = new TableSplit(tableName, scan, 270 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc 271 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); 272 splits.add(split); @@ -317,7 +317,7 @@ 309 310 byte[] regionName = location.getRegionInfo().getRegionName(); 311 long regionSize = sizeCalculator.getRegionSize(regionName); -312 TableSplit split = new TableSplit(tableName, +312 TableSplit split = new TableSplit(tableName, scan, 313 splitStart, splitStop, regionLocation, regionSize); 314 splits.add(split); 315 if (LOG.isDebugEnabled()) { @@ -405,9 +405,9 @@ 397 byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey); 398 //Set the size of child TableSplit as 1/2 of the region size. The exact size of the 399 // MapReduce input splits is not far off. -400 TableSplit t1 = new TableSplit(tableName, ts.getStartRow(), splitKey, regionLocation, +400 TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, regionLocation, 401 regionSize / 2); -402 TableSplit t2 = new TableSplit(tableName, splitKey, ts.getEndRow(), regionLocation, +402 TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation, 403 regionSize - regionSize / 2); 404 resultList.add(t1); 405 resultList.add(t2); @@ -434,7 +434,7 @@ 426 break; 427 } 428 } -429 TableSplit t = new TableSplit(tableName, splitStartKey, splitEndKey, +429 TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, 430 regionLocation, totalSize); 431 resultList.add(t); 432 } http://git-wip-us.apache.org/repos/asf/hbase-site/blob/8bb348c6/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html index 4bcfbd0..a5703ad 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html @@ -569,7 +569,7 @@ 561 * @return The scan saved in a Base64 encoded string. 562 * @throws IOException When writing the scan fails. 563 */ -564 static String convertScanToString(Scan scan) throws IOException { +564 public static String convertScanToString(Scan scan) throws IOException { 565 ClientProtos.Scan proto = ProtobufUtil.toScan(scan); 566 return Base64.encodeBytes(proto.toByteArray()); 567 } @@ -581,7 +581,7 @@ 573 * @return The newly created Scan instance. 574 * @throws IOException When reading the scan instance fails. 575 */ -576 static Scan convertStringToScan(String base64) throws IOException { +576 public static Scan convertStringToScan(String base64) throws IOException { 577 byte [] decoded = Base64.decode(base64); 578 ClientProtos.Scan scan; 579 try {