From dev-return-78012-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Fri Feb 1 22:46:00 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B8A8D18067E for ; Fri, 1 Feb 2019 23:45:59 +0100 (CET) Received: (qmail 65238 invoked by uid 500); 1 Feb 2019 22:45:55 -0000 Mailing-List: contact dev-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list dev@zookeeper.apache.org Received: (qmail 64748 invoked by uid 99); 1 Feb 2019 22:45:55 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Feb 2019 22:45:55 +0000 From: GitBox To: dev@zookeeper.apache.org Subject: [GitHub] ivmaykov commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig tests are constantly failing on 3.5 after applying Java 11 fix Message-ID: <154906115471.19277.721272416825602296.gitbox@gitbox.apache.org> Date: Fri, 01 Feb 2019 22:45:54 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit ivmaykov commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig tests are constantly failing on 3.5 after applying Java 11 fix URL: https://github.com/apache/zookeeper/pull/753#discussion_r253223160 ########## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ########## @@ -61,215 +26,184 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; -import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; -import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.X509KeyManager; +import javax.net.ssl.X509TrustManager; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.DefaultEventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.common.ClientX509Util; +import org.apache.zookeeper.common.NettyUtils; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.common.X509Exception.SSLContextException; +import org.apache.zookeeper.server.auth.ProviderRegistry; +import org.apache.zookeeper.server.auth.X509AuthenticationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NettyServerCnxnFactory extends ServerCnxnFactory { private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class); - ServerBootstrap bootstrap; - Channel parentChannel; - ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns"); - HashMap> ipMap = - new HashMap>( ); - InetSocketAddress localAddress; - int maxClientCnxns = 60; - ClientX509Util x509Util; + private final ServerBootstrap bootstrap; + private Channel parentChannel; + private final ChannelGroup allChannels = + new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor()); + // Access to ipMap or to any Set contained in the map needs to be + // protected with synchronized (ipMap) { ... } + private final Map> ipMap = new HashMap<>(); + private InetSocketAddress localAddress; + private int maxClientCnxns = 60; + private final ClientX509Util x509Util; + + private static final AttributeKey CONNECTION_ATTRIBUTE = + AttributeKey.valueOf("NettyServerCnxn"); + + private static final AtomicReference TEST_ALLOCATOR = + new AtomicReference<>(null); /** - * This is an inner class since we need to extend SimpleChannelHandler, but + * This is an inner class since we need to extend ChannelDuplexHandler, but * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner * this class gets access to the member variables and methods. */ @Sharable - class CnxnChannelHandler extends SimpleChannelHandler { + class CnxnChannelHandler extends ChannelDuplexHandler { @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception - { + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("Channel closed " + e); + LOG.trace("Channel active {}", ctx.channel()); } - allChannels.remove(ctx.getChannel()); - } - @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception - { - if (LOG.isTraceEnabled()) { - LOG.trace("Channel connected " + e); - } - - NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(), + NettyServerCnxn cnxn = new NettyServerCnxn(ctx.channel(), zkServer, NettyServerCnxnFactory.this); - ctx.setAttachment(cnxn); + ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { - SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); - ChannelFuture handshakeFuture = sslHandler.handshake(); + SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { - allChannels.add(ctx.getChannel()); + allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception - { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("Channel disconnected " + e); + LOG.trace("Channel inactive {}", ctx.channel()); } - NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); + allChannels.remove(ctx.channel()); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { - LOG.trace("Channel disconnect caused close " + e); + LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception - { - LOG.warn("Exception caught " + e, e.getCause()); - NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.warn("Exception caught", cause); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Closing " + cnxn); + LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception - { - if (LOG.isTraceEnabled()) { - LOG.trace("message received called " + e.getMessage()); - } + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { - if (LOG.isDebugEnabled()) { - LOG.debug("New message " + e.toString() - + " from " + ctx.getChannel()); - } - NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); - synchronized(cnxn) { - processMessage(e, cnxn); + if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { + LOG.debug("Received AutoReadEvent.ENABLE"); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() + // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run + // after either of those. Check for null just to be safe ... + if (cnxn != null) { + cnxn.processQueuedBuffer(); + } + ctx.channel().config().setAutoRead(true); + } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) { + LOG.debug("Received AutoReadEvent.DISABLE"); + ctx.channel().config().setAutoRead(false); } - } catch(Exception ex) { - LOG.error("Unexpected exception in receive", ex); - throw ex; + } finally { + ReferenceCountUtil.release(evt); } } - private void processMessage(MessageEvent e, NettyServerCnxn cnxn) { - if (LOG.isDebugEnabled()) { - LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: " - + cnxn.queuedBuffer); - } - - if (e instanceof NettyServerCnxn.ResumeMessageEvent) { - LOG.debug("Received ResumeMessageEvent"); - if (cnxn.queuedBuffer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("processing queue " - + Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - cnxn.receiveMessage(cnxn.queuedBuffer); - if (!cnxn.queuedBuffer.readable()) { - LOG.debug("Processed queue - no bytes remaining"); - cnxn.queuedBuffer = null; - } else { - LOG.debug("Processed queue - bytes remaining"); - } - } else { - LOG.debug("queue empty"); - } - cnxn.channel.setReadable(true); - } else { - ChannelBuffer buf = (ChannelBuffer)e.getMessage(); + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " buf 0x" - + ChannelBuffers.hexDump(buf)); + LOG.trace("message received called {}", msg); } - - if (cnxn.throttled) { - LOG.debug("Received message while throttled"); - // we are throttled, so we need to queue - if (cnxn.queuedBuffer == null) { - LOG.debug("allocating queue"); - cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes()); - } - cnxn.queuedBuffer.writeBytes(buf); - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("New message {} from {}", msg, ctx.channel()); } - } else { - LOG.debug("not throttled"); - if (cnxn.queuedBuffer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - cnxn.queuedBuffer.writeBytes(buf); - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - - cnxn.receiveMessage(cnxn.queuedBuffer); - if (!cnxn.queuedBuffer.readable()) { - LOG.debug("Processed queue - no bytes remaining"); - cnxn.queuedBuffer = null; - } else { - LOG.debug("Processed queue - bytes remaining"); - } + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + if (cnxn == null) { + LOG.error("channelRead() on a closed or closing NettyServerCnxn"); } else { - cnxn.receiveMessage(buf); - if (buf.readable()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Before copy " + buf); - } - cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes()); - cnxn.queuedBuffer.writeBytes(buf); - if (LOG.isTraceEnabled()) { - LOG.trace("Copy is " + cnxn.queuedBuffer); - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - } + cnxn.processMessage((ByteBuf) msg); } + } catch (Exception ex) { + LOG.error("Unexpected exception in receive", ex); + throw ex; } + } finally { + ReferenceCountUtil.release(msg); } } @Override - public void writeComplete(ChannelHandlerContext ctx, - WriteCompletionEvent e) throws Exception - { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("write complete " + e); + promise.addListener((future) -> { + LOG.trace("write {}", + future.isSuccess() ? "complete" : "failed"); + }); } + super.write(ctx, msg, promise); } - private final class CertificateVerifier - implements ChannelFutureListener { + private final class CertificateVerifier implements GenericFutureListener> { Review comment: I found some issues with this code which I fixed internally but haven't upstreamed the fixes to master yet. So, as-is client TLS might not always work on 3.5. There are some cases where it doesn't work - I think specifically, anonymous TLS is not possible because line 226 assumes that the client always has certificates, which is not true for anonymous clients. Nothing to do as part of this PR, but I will look into upstreaming my fixes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services