Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3BEF1200C6F for ; Tue, 9 May 2017 17:00:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3A6DE160BCC; Tue, 9 May 2017 15:00:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 35B19160BB6 for ; Tue, 9 May 2017 17:00:32 +0200 (CEST) Received: (qmail 77848 invoked by uid 500); 9 May 2017 15:00:25 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 73918 invoked by uid 99); 9 May 2017 15:00:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 15:00:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 94D0AF2173; Tue, 9 May 2017 15:00:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 09 May 2017 15:00:44 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [26/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Tue, 09 May 2017 15:00:34 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/7ef4c5a9/devapidocs/src-html/org/apache/hadoop/hbase/ipc/NettyRpcServer.MessageDecoder.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/NettyRpcServer.MessageDecoder.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/NettyRpcServer.MessageDecoder.html index 27e0dee..109b5f3 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/NettyRpcServer.MessageDecoder.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/NettyRpcServer.MessageDecoder.html @@ -67,157 +67,157 @@ 059import org.apache.hadoop.hbase.CellScanner; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.Server; -062import org.apache.hadoop.hbase.classification.InterfaceStability; -063import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -064import org.apache.hadoop.hbase.nio.ByteBuff; -065import org.apache.hadoop.hbase.nio.SingleByteBuff; -066import org.apache.hadoop.hbase.security.AccessDeniedException; -067import org.apache.hadoop.hbase.security.AuthMethod; -068import org.apache.hadoop.hbase.security.HBasePolicyProvider; -069import org.apache.hadoop.hbase.security.SaslStatus; -070import org.apache.hadoop.hbase.security.SaslUtil; -071import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; -072import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; -073import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -075import org.apache.hadoop.hbase.util.Bytes; -076import org.apache.hadoop.hbase.util.JVM; -077import org.apache.hadoop.hbase.util.Pair; -078import org.apache.hadoop.io.IntWritable; -079import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; -080import org.apache.htrace.TraceInfo; -081 -082/** -083 * An RPC server with Netty4 implementation. -084 * -085 */ -086public class NettyRpcServer extends RpcServer { -087 -088 public static final Log LOG = LogFactory.getLog(NettyRpcServer.class); -089 -090 protected final InetSocketAddress bindAddress; -091 -092 private final CountDownLatch closed = new CountDownLatch(1); -093 private final Channel serverChannel; -094 private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);; -095 -096 public NettyRpcServer(final Server server, final String name, -097 final List<BlockingServiceAndInterface> services, -098 final InetSocketAddress bindAddress, Configuration conf, -099 RpcScheduler scheduler) throws IOException { -100 super(server, name, services, bindAddress, conf, scheduler); -101 this.bindAddress = bindAddress; -102 boolean useEpoll = useEpoll(conf); -103 int workerCount = conf.getInt("hbase.netty.rpc.server.worker.count", -104 Runtime.getRuntime().availableProcessors() / 4); -105 EventLoopGroup bossGroup = null; -106 EventLoopGroup workerGroup = null; -107 if (useEpoll) { -108 bossGroup = new EpollEventLoopGroup(1); -109 workerGroup = new EpollEventLoopGroup(workerCount); -110 } else { -111 bossGroup = new NioEventLoopGroup(1); -112 workerGroup = new NioEventLoopGroup(workerCount); -113 } -114 ServerBootstrap bootstrap = new ServerBootstrap(); -115 bootstrap.group(bossGroup, workerGroup); -116 if (useEpoll) { -117 bootstrap.channel(EpollServerSocketChannel.class); -118 } else { -119 bootstrap.channel(NioServerSocketChannel.class); -120 } -121 bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay); -122 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive); -123 bootstrap.childOption(ChannelOption.ALLOCATOR, -124 PooledByteBufAllocator.DEFAULT); -125 bootstrap.childHandler(new Initializer(maxRequestSize)); -126 -127 try { -128 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); -129 LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress() -130 + ", hbase.netty.rpc.server.worker.count=" + workerCount -131 + ", useEpoll=" + useEpoll); -132 allChannels.add(serverChannel); -133 } catch (InterruptedException e) { -134 throw new InterruptedIOException(e.getMessage()); -135 } -136 initReconfigurable(conf); -137 this.scheduler.init(new RpcSchedulerContext(this)); -138 } -139 -140 private static boolean useEpoll(Configuration conf) { -141 // Config to enable native transport. -142 boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport", -143 true); -144 // Use the faster native epoll transport mechanism on linux if enabled -145 return epollEnabled && JVM.isLinux() && JVM.isAmd64(); -146 } -147 -148 @Override -149 public synchronized void start() { -150 if (started) { -151 return; -152 } -153 authTokenSecretMgr = createSecretManager(); -154 if (authTokenSecretMgr != null) { -155 setSecretManager(authTokenSecretMgr); -156 authTokenSecretMgr.start(); -157 } -158 this.authManager = new ServiceAuthorizationManager(); -159 HBasePolicyProvider.init(conf, authManager); -160 scheduler.start(); -161 started = true; -162 } -163 -164 @Override -165 public synchronized void stop() { -166 if (!running) { -167 return; -168 } -169 LOG.info("Stopping server on " + this.bindAddress.getPort()); -170 if (authTokenSecretMgr != null) { -171 authTokenSecretMgr.stop(); -172 authTokenSecretMgr = null; -173 } -174 allChannels.close().awaitUninterruptibly(); -175 serverChannel.close(); -176 scheduler.stop(); -177 closed.countDown(); -178 running = false; -179 } -180 -181 @Override -182 public synchronized void join() throws InterruptedException { -183 closed.await(); -184 } -185 -186 @Override -187 public synchronized InetSocketAddress getListenerAddress() { -188 return ((InetSocketAddress) serverChannel.localAddress()); -189 } -190 -191 public class NettyConnection extends RpcServer.Connection { -192 -193 protected Channel channel; -194 -195 NettyConnection(Channel channel) { -196 super(); -197 this.channel = channel; -198 InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); -199 this.addr = inetSocketAddress.getAddress(); -200 if (addr == null) { -201 this.hostAddress = "*Unknown*"; -202 } else { -203 this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); -204 } -205 this.remotePort = inetSocketAddress.getPort(); -206 this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, -207 0, null, null, 0, null); -208 this.setConnectionHeaderResponseCall = new Call( -209 CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, -210 this, 0, null, null, 0, null); -211 this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, -212 null, null, null, this, 0, null, null, 0, null); +062import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +063import org.apache.hadoop.hbase.nio.ByteBuff; +064import org.apache.hadoop.hbase.nio.SingleByteBuff; +065import org.apache.hadoop.hbase.security.AccessDeniedException; +066import org.apache.hadoop.hbase.security.AuthMethod; +067import org.apache.hadoop.hbase.security.HBasePolicyProvider; +068import org.apache.hadoop.hbase.security.SaslStatus; +069import org.apache.hadoop.hbase.security.SaslUtil; +070import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +071import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +072import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +074import org.apache.hadoop.hbase.util.Bytes; +075import org.apache.hadoop.hbase.util.JVM; +076import org.apache.hadoop.hbase.util.Pair; +077import org.apache.hadoop.io.IntWritable; +078import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +079import org.apache.htrace.TraceInfo; +080 +081/** +082 * An RPC server with Netty4 implementation. +083 * +084 */ +085public class NettyRpcServer extends RpcServer { +086 +087 public static final Log LOG = LogFactory.getLog(NettyRpcServer.class); +088 +089 protected final InetSocketAddress bindAddress; +090 +091 private final CountDownLatch closed = new CountDownLatch(1); +092 private final Channel serverChannel; +093 private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);; +094 +095 public NettyRpcServer(final Server server, final String name, +096 final List<BlockingServiceAndInterface> services, +097 final InetSocketAddress bindAddress, Configuration conf, +098 RpcScheduler scheduler) throws IOException { +099 super(server, name, services, bindAddress, conf, scheduler); +100 this.bindAddress = bindAddress; +101 boolean useEpoll = useEpoll(conf); +102 int workerCount = conf.getInt("hbase.netty.rpc.server.worker.count", +103 Runtime.getRuntime().availableProcessors() / 4); +104 EventLoopGroup bossGroup = null; +105 EventLoopGroup workerGroup = null; +106 if (useEpoll) { +107 bossGroup = new EpollEventLoopGroup(1); +108 workerGroup = new EpollEventLoopGroup(workerCount); +109 } else { +110 bossGroup = new NioEventLoopGroup(1); +111 workerGroup = new NioEventLoopGroup(workerCount); +112 } +113 ServerBootstrap bootstrap = new ServerBootstrap(); +114 bootstrap.group(bossGroup, workerGroup); +115 if (useEpoll) { +116 bootstrap.channel(EpollServerSocketChannel.class); +117 } else { +118 bootstrap.channel(NioServerSocketChannel.class); +119 } +120 bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay); +121 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive); +122 bootstrap.childOption(ChannelOption.ALLOCATOR, +123 PooledByteBufAllocator.DEFAULT); +124 bootstrap.childHandler(new Initializer(maxRequestSize)); +125 +126 try { +127 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); +128 LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress() +129 + ", hbase.netty.rpc.server.worker.count=" + workerCount +130 + ", useEpoll=" + useEpoll); +131 allChannels.add(serverChannel); +132 } catch (InterruptedException e) { +133 throw new InterruptedIOException(e.getMessage()); +134 } +135 initReconfigurable(conf); +136 this.scheduler.init(new RpcSchedulerContext(this)); +137 } +138 +139 private static boolean useEpoll(Configuration conf) { +140 // Config to enable native transport. +141 boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport", +142 true); +143 // Use the faster native epoll transport mechanism on linux if enabled +144 return epollEnabled && JVM.isLinux() && JVM.isAmd64(); +145 } +146 +147 @Override +148 public synchronized void start() { +149 if (started) { +150 return; +151 } +152 authTokenSecretMgr = createSecretManager(); +153 if (authTokenSecretMgr != null) { +154 setSecretManager(authTokenSecretMgr); +155 authTokenSecretMgr.start(); +156 } +157 this.authManager = new ServiceAuthorizationManager(); +158 HBasePolicyProvider.init(conf, authManager); +159 scheduler.start(); +160 started = true; +161 } +162 +163 @Override +164 public synchronized void stop() { +165 if (!running) { +166 return; +167 } +168 LOG.info("Stopping server on " + this.bindAddress.getPort()); +169 if (authTokenSecretMgr != null) { +170 authTokenSecretMgr.stop(); +171 authTokenSecretMgr = null; +172 } +173 allChannels.close().awaitUninterruptibly(); +174 serverChannel.close(); +175 scheduler.stop(); +176 closed.countDown(); +177 running = false; +178 } +179 +180 @Override +181 public synchronized void join() throws InterruptedException { +182 closed.await(); +183 } +184 +185 @Override +186 public synchronized InetSocketAddress getListenerAddress() { +187 return ((InetSocketAddress) serverChannel.localAddress()); +188 } +189 +190 public class NettyConnection extends RpcServer.Connection { +191 +192 protected Channel channel; +193 +194 NettyConnection(Channel channel) { +195 super(); +196 this.channel = channel; +197 InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); +198 this.addr = inetSocketAddress.getAddress(); +199 if (addr == null) { +200 this.hostAddress = "*Unknown*"; +201 } else { +202 this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); +203 } +204 this.remotePort = inetSocketAddress.getPort(); +205 this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, +206 null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); +207 this.setConnectionHeaderResponseCall = +208 new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this, +209 0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); +210 this.authFailedCall = +211 new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0, +212 null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); 213 } 214 215 void readPreamble(ByteBuf buffer) throws IOException { @@ -251,7 +251,7 @@ 243 AccessDeniedException ae = new AccessDeniedException( 244 "Authentication is required"); 245 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); -246 ((Call) authFailedCall) +246 ((NettyServerCall) authFailedCall) 247 .sendResponseIfReady(ChannelFutureListener.CLOSE); 248 return; 249 } @@ -277,8 +277,8 @@ 269 270 private void doBadPreambleHandling(final String msg, final Exception e) throws IOException { 271 LOG.warn(msg); -272 Call fakeCall = new Call(-1, null, null, null, null, null, this, -1, -273 null, null, 0, null); +272 NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, +273 null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); 274 setupResponse(null, fakeCall, e, msg); 275 // closes out the connection. 276 fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE); @@ -344,208 +344,163 @@ 336 } 337 338 @Override -339 public RpcServer.Call createCall(int id, final BlockingService service, +339 public ServerCall createCall(int id, final BlockingService service, 340 final MethodDescriptor md, RequestHeader header, Message param, 341 CellScanner cellScanner, RpcServer.Connection connection, long size, 342 TraceInfo tinfo, final InetAddress remoteAddress, int timeout, 343 CallCleanup reqCleanup) { -344 return new Call(id, service, md, header, param, cellScanner, connection, -345 size, tinfo, remoteAddress, timeout, reqCleanup); -346 } -347 } -348 -349 /** -350 * Datastructure that holds all necessary to a method invocation and then afterward, carries the -351 * result. -352 */ -353 @InterfaceStability.Evolving -354 public class Call extends RpcServer.Call { -355 -356 Call(int id, final BlockingService service, final MethodDescriptor md, -357 RequestHeader header, Message param, CellScanner cellScanner, -358 RpcServer.Connection connection, long size, TraceInfo tinfo, -359 final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { -360 super(id, service, md, header, param, cellScanner, -361 connection, size, tinfo, remoteAddress, timeout, reqCleanup); -362 } -363 -364 @Override -365 public long disconnectSince() { -366 if (!getConnection().isConnectionOpen()) { -367 return System.currentTimeMillis() - timestamp; -368 } else { -369 return -1L; -370 } -371 } +344 return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size, +345 tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder, +346 reqCleanup); +347 } +348 } +349 +350 private class Initializer extends ChannelInitializer<SocketChannel> { +351 +352 final int maxRequestSize; +353 +354 Initializer(int maxRequestSize) { +355 this.maxRequestSize = maxRequestSize; +356 } +357 +358 @Override +359 protected void initChannel(SocketChannel channel) throws Exception { +360 ChannelPipeline pipeline = channel.pipeline(); +361 pipeline.addLast("header", new ConnectionHeaderHandler()); +362 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder( +363 maxRequestSize, 0, 4, 0, 4, true)); +364 pipeline.addLast("decoder", new MessageDecoder()); +365 pipeline.addLast("encoder", new MessageEncoder()); +366 } +367 +368 } +369 +370 private class ConnectionHeaderHandler extends ByteToMessageDecoder { +371 private NettyConnection connection; 372 -373 NettyConnection getConnection() { -374 return (NettyConnection) this.connection; -375 } -376 -377 /** -378 * If we have a response, and delay is not set, then respond immediately. Otherwise, do not -379 * respond to client. This is called by the RPC code in the context of the Handler thread. -380 */ -381 @Override -382 public synchronized void sendResponseIfReady() throws IOException { -383 getConnection().channel.writeAndFlush(this); -384 } -385 -386 public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException { -387 getConnection().channel.writeAndFlush(this).addListener(listener); -388 } -389 -390 } -391 -392 private class Initializer extends ChannelInitializer<SocketChannel> { -393 -394 final int maxRequestSize; -395 -396 Initializer(int maxRequestSize) { -397 this.maxRequestSize = maxRequestSize; -398 } -399 -400 @Override -401 protected void initChannel(SocketChannel channel) throws Exception { -402 ChannelPipeline pipeline = channel.pipeline(); -403 pipeline.addLast("header", new ConnectionHeaderHandler()); -404 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder( -405 maxRequestSize, 0, 4, 0, 4, true)); -406 pipeline.addLast("decoder", new MessageDecoder()); -407 pipeline.addLast("encoder", new MessageEncoder()); -408 } -409 -410 } -411 -412 private class ConnectionHeaderHandler extends ByteToMessageDecoder { -413 private NettyConnection connection; -414 -415 ConnectionHeaderHandler() { +373 ConnectionHeaderHandler() { +374 } +375 +376 @Override +377 protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, +378 List<Object> out) throws Exception { +379 if (byteBuf.readableBytes() < 6) { +380 return; +381 } +382 connection = new NettyConnection(ctx.channel()); +383 connection.readPreamble(byteBuf); +384 ((MessageDecoder) ctx.pipeline().get("decoder")) +385 .setConnection(connection); +386 ctx.pipeline().remove(this); +387 } +388 +389 } +390 +391 private class MessageDecoder extends ChannelInboundHandlerAdapter { +392 +393 private NettyConnection connection; +394 +395 void setConnection(NettyConnection connection) { +396 this.connection = connection; +397 } +398 +399 @Override +400 public void channelActive(ChannelHandlerContext ctx) throws Exception { +401 allChannels.add(ctx.channel()); +402 if (LOG.isDebugEnabled()) { +403 LOG.debug("Connection from " + ctx.channel().remoteAddress() +404 + "; # active connections: " + getNumOpenConnections()); +405 } +406 super.channelActive(ctx); +407 } +408 +409 @Override +410 public void channelRead(ChannelHandlerContext ctx, Object msg) +411 throws Exception { +412 ByteBuf input = (ByteBuf) msg; +413 // 4 bytes length field +414 metrics.receivedBytes(input.readableBytes() + 4); +415 connection.process(input); 416 } 417 418 @Override -419 protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, -420 List<Object> out) throws Exception { -421 if (byteBuf.readableBytes() < 6) { -422 return; -423 } -424 connection = new NettyConnection(ctx.channel()); -425 connection.readPreamble(byteBuf); -426 ((MessageDecoder) ctx.pipeline().get("decoder")) -427 .setConnection(connection); -428 ctx.pipeline().remove(this); -429 } -430 -431 } -432 -433 private class MessageDecoder extends ChannelInboundHandlerAdapter { -434 -435 private NettyConnection connection; -436 -437 void setConnection(NettyConnection connection) { -438 this.connection = connection; -439 } -440 -441 @Override -442 public void channelActive(ChannelHandlerContext ctx) throws Exception { -443 allChannels.add(ctx.channel()); -444 if (LOG.isDebugEnabled()) { -445 LOG.debug("Connection from " + ctx.channel().remoteAddress() -446 + "; # active connections: " + getNumOpenConnections()); -447 } -448 super.channelActive(ctx); -449 } +419 public void channelInactive(ChannelHandlerContext ctx) throws Exception { +420 allChannels.remove(ctx.channel()); +421 if (LOG.isDebugEnabled()) { +422 LOG.debug("Disconnecting client: " + ctx.channel().remoteAddress() +423 + ". Number of active connections: " + getNumOpenConnections()); +424 } +425 super.channelInactive(ctx); +426 } +427 +428 @Override +429 public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { +430 allChannels.remove(ctx.channel()); +431 if (LOG.isDebugEnabled()) { +432 LOG.debug("Connection from " + ctx.channel().remoteAddress() +433 + " catch unexpected exception from downstream.", e.getCause()); +434 } +435 ctx.channel().close(); +436 } +437 +438 } +439 +440 private class MessageEncoder extends ChannelOutboundHandlerAdapter { +441 +442 @Override +443 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { +444 final NettyServerCall call = (NettyServerCall) msg; +445 ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers()); +446 ctx.write(response, promise).addListener(new CallWriteListener(call)); +447 } +448 +449 } 450 -451 @Override -452 public void channelRead(ChannelHandlerContext ctx, Object msg) -453 throws Exception { -454 ByteBuf input = (ByteBuf) msg; -455 // 4 bytes length field -456 metrics.receivedBytes(input.readableBytes() + 4); -457 connection.process(input); -458 } -459 -460 @Override -461 public void channelInactive(ChannelHandlerContext ctx) throws Exception { -462 allChannels.remove(ctx.channel()); -463 if (LOG.isDebugEnabled()) { -464 LOG.debug("Disconnecting client: " + ctx.channel().remoteAddress() -465 + ". Number of active connections: " + getNumOpenConnections()); -466 } -467 super.channelInactive(ctx); -468 } -469 -470 @Override -471 public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { -472 allChannels.remove(ctx.channel()); -473 if (LOG.isDebugEnabled()) { -474 LOG.debug("Connection from " + ctx.channel().remoteAddress() -475 + " catch unexpected exception from downstream.", e.getCause()); -476 } -477 ctx.channel().close(); -478 } -479 -480 } -481 -482 private class MessageEncoder extends ChannelOutboundHandlerAdapter { -483 -484 @Override -485 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { -486 final Call call = (Call) msg; -487 ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers()); -488 ctx.write(response, promise).addListener(new CallWriteListener(call)); -489 } -490 -491 } -492 -493 private class CallWriteListener implements ChannelFutureListener { -494 -495 private Call call; -496 -497 CallWriteListener(Call call) { -498 this.call = call; -499 } -500 -501 @Override -502 public void operationComplete(ChannelFuture future) throws Exception { -503 call.done(); -504 if (future.isSuccess()) { -505 metrics.sentBytes(call.response.size()); -506 } -507 } -508 -509 } -510 -511 @Override -512 public void setSocketSendBufSize(int size) { -513 } -514 -515 @Override -516 public int getNumOpenConnections() { -517 // allChannels also contains the server channel, so exclude that from the count. -518 return allChannels.size() - 1; -519 } -520 -521 @Override -522 public Pair<Message, CellScanner> call(BlockingService service, -523 MethodDescriptor md, Message param, CellScanner cellScanner, -524 long receiveTime, MonitoredRPCHandler status) throws IOException { -525 return call(service, md, param, cellScanner, receiveTime, status, -526 System.currentTimeMillis(), 0); -527 } -528 -529 @Override -530 public Pair<Message, CellScanner> call(BlockingService service, -531 MethodDescriptor md, Message param, CellScanner cellScanner, -532 long receiveTime, MonitoredRPCHandler status, long startTime, int timeout) -533 throws IOException { -534 Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, -535 -1, null, null, timeout, null); -536 fakeCall.setReceiveTime(receiveTime); -537 return call(fakeCall, status); -538 } -539 -540} +451 private class CallWriteListener implements ChannelFutureListener { +452 +453 private NettyServerCall call; +454 +455 CallWriteListener(NettyServerCall call) { +456 this.call = call; +457 } +458 +459 @Override +460 public void operationComplete(ChannelFuture future) throws Exception { +461 call.done(); +462 if (future.isSuccess()) { +463 metrics.sentBytes(call.response.size()); +464 } +465 } +466 +467 } +468 +469 @Override +470 public void setSocketSendBufSize(int size) { +471 } +472 +473 @Override +474 public int getNumOpenConnections() { +475 // allChannels also contains the server channel, so exclude that from the count. +476 return allChannels.size() - 1; +477 } +478 +479 @Override +480 public Pair<Message, CellScanner> call(BlockingService service, +481 MethodDescriptor md, Message param, CellScanner cellScanner, +482 long receiveTime, MonitoredRPCHandler status) throws IOException { +483 return call(service, md, param, cellScanner, receiveTime, status, +484 System.currentTimeMillis(), 0); +485 } +486 +487 @Override +488 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, +489 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, +490 long startTime, int timeout) throws IOException { +491 NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null, +492 -1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null); +493 return call(fakeCall, status); +494 } +495}