From dev-return-78103-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Tue Feb 5 14:53:15 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 497C918067A for ; Tue, 5 Feb 2019 15:53:15 +0100 (CET) Received: (qmail 47700 invoked by uid 500); 5 Feb 2019 14:53:14 -0000 Mailing-List: contact dev-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list dev@zookeeper.apache.org Received: (qmail 47689 invoked by uid 99); 5 Feb 2019 14:53:14 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Feb 2019 14:53:14 +0000 From: GitBox To: dev@zookeeper.apache.org Subject: [GitHub] anmolnar commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig tests are constantly failing on 3.5 after applying Java 11 fix Message-ID: <154937839373.27095.5165278624167028108.gitbox@gitbox.apache.org> Date: Tue, 05 Feb 2019 14:53:13 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit anmolnar commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig tests are constantly failing on 3.5 after applying Java 11 fix URL: https://github.com/apache/zookeeper/pull/753#discussion_r253896039 ########## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ########## @@ -61,215 +26,184 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; -import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; -import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.X509KeyManager; +import javax.net.ssl.X509TrustManager; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.DefaultEventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.common.ClientX509Util; +import org.apache.zookeeper.common.NettyUtils; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.common.X509Exception.SSLContextException; +import org.apache.zookeeper.server.auth.ProviderRegistry; +import org.apache.zookeeper.server.auth.X509AuthenticationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NettyServerCnxnFactory extends ServerCnxnFactory { private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class); - ServerBootstrap bootstrap; - Channel parentChannel; - ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns"); - HashMap> ipMap = - new HashMap>( ); - InetSocketAddress localAddress; - int maxClientCnxns = 60; - ClientX509Util x509Util; + private final ServerBootstrap bootstrap; + private Channel parentChannel; + private final ChannelGroup allChannels = + new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor()); + // Access to ipMap or to any Set contained in the map needs to be + // protected with synchronized (ipMap) { ... } + private final Map> ipMap = new HashMap<>(); + private InetSocketAddress localAddress; + private int maxClientCnxns = 60; + private final ClientX509Util x509Util; + + private static final AttributeKey CONNECTION_ATTRIBUTE = + AttributeKey.valueOf("NettyServerCnxn"); + + private static final AtomicReference TEST_ALLOCATOR = + new AtomicReference<>(null); /** - * This is an inner class since we need to extend SimpleChannelHandler, but + * This is an inner class since we need to extend ChannelDuplexHandler, but * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner * this class gets access to the member variables and methods. */ @Sharable - class CnxnChannelHandler extends SimpleChannelHandler { + class CnxnChannelHandler extends ChannelDuplexHandler { @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception - { + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("Channel closed " + e); + LOG.trace("Channel active {}", ctx.channel()); } - allChannels.remove(ctx.getChannel()); - } - @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception - { - if (LOG.isTraceEnabled()) { - LOG.trace("Channel connected " + e); - } - - NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(), + NettyServerCnxn cnxn = new NettyServerCnxn(ctx.channel(), zkServer, NettyServerCnxnFactory.this); - ctx.setAttachment(cnxn); + ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { - SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); - ChannelFuture handshakeFuture = sslHandler.handshake(); + SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { - allChannels.add(ctx.getChannel()); + allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception - { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("Channel disconnected " + e); + LOG.trace("Channel inactive {}", ctx.channel()); } - NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); + allChannels.remove(ctx.channel()); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { - LOG.trace("Channel disconnect caused close " + e); + LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception - { - LOG.warn("Exception caught " + e, e.getCause()); - NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.warn("Exception caught", cause); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Closing " + cnxn); + LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception - { - if (LOG.isTraceEnabled()) { - LOG.trace("message received called " + e.getMessage()); - } + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { - if (LOG.isDebugEnabled()) { - LOG.debug("New message " + e.toString() - + " from " + ctx.getChannel()); - } - NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); - synchronized(cnxn) { - processMessage(e, cnxn); + if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { + LOG.debug("Received AutoReadEvent.ENABLE"); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services