zookeeper-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] ivmaykov commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig tests are constantly failing on 3.5 after applying Java 11 fix
Date Fri, 01 Feb 2019 22:45:54 GMT
ivmaykov commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig tests are constantly
failing on 3.5 after applying Java 11 fix
URL: https://github.com/apache/zookeeper/pull/753#discussion_r253223160
 
 

 ##########
 File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ##########
 @@ -61,215 +26,184 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
-import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.X509KeyManager;
+import javax.net.ssl.X509TrustManager;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.NettyUtils;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.common.X509Exception.SSLContextException;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NettyServerCnxnFactory extends ServerCnxnFactory {
     private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
 
-    ServerBootstrap bootstrap;
-    Channel parentChannel;
-    ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
-    HashMap<InetAddress, Set<NettyServerCnxn>> ipMap =
-        new HashMap<InetAddress, Set<NettyServerCnxn>>( );
-    InetSocketAddress localAddress;
-    int maxClientCnxns = 60;
-    ClientX509Util x509Util;
+    private final ServerBootstrap bootstrap;
+    private Channel parentChannel;
+    private final ChannelGroup allChannels =
+            new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor());
+    // Access to ipMap or to any Set contained in the map needs to be
+    // protected with synchronized (ipMap) { ... }
+    private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>();
+    private InetSocketAddress localAddress;
+    private int maxClientCnxns = 60;
+    private final ClientX509Util x509Util;
+
+    private static final AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE =
+            AttributeKey.valueOf("NettyServerCnxn");
+
+    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
+            new AtomicReference<>(null);
 
     /**
-     * This is an inner class since we need to extend SimpleChannelHandler, but
+     * This is an inner class since we need to extend ChannelDuplexHandler, but
      * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
      * this class gets access to the member variables and methods.
      */
     @Sharable
-    class CnxnChannelHandler extends SimpleChannelHandler {
+    class CnxnChannelHandler extends ChannelDuplexHandler {
 
         @Override
-        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
-            throws Exception
-        {
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel closed " + e);
+                LOG.trace("Channel active {}", ctx.channel());
             }
-            allChannels.remove(ctx.getChannel());
-        }
 
-        @Override
-        public void channelConnected(ChannelHandlerContext ctx,
-                ChannelStateEvent e) throws Exception
-        {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel connected " + e);
-            }
-
-            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),
+            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.channel(),
                     zkServer, NettyServerCnxnFactory.this);
-            ctx.setAttachment(cnxn);
+            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
             if (secure) {
-                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
-                ChannelFuture handshakeFuture = sslHandler.handshake();
+                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
+                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                 handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
             } else {
-                allChannels.add(ctx.getChannel());
+                allChannels.add(ctx.channel());
                 addCnxn(cnxn);
             }
         }
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx,
-                ChannelStateEvent e) throws Exception
-        {
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel disconnected " + e);
+                LOG.trace("Channel inactive {}", ctx.channel());
             }
-            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+            allChannels.remove(ctx.channel());
+            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
             if (cnxn != null) {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Channel disconnect caused close " + e);
+                    LOG.trace("Channel inactive caused close {}", cnxn);
                 }
                 cnxn.close();
             }
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-            throws Exception
-        {
-            LOG.warn("Exception caught " + e, e.getCause());
-            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
+            LOG.warn("Exception caught", cause);
+            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
             if (cnxn != null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Closing " + cnxn);
+                    LOG.debug("Closing {}", cnxn);
                 }
                 cnxn.close();
             }
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-            throws Exception
-        {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("message received called " + e.getMessage());
-            }
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
{
             try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("New message " + e.toString()
-                            + " from " + ctx.getChannel());
-                }
-                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
-                synchronized(cnxn) {
-                    processMessage(e, cnxn);
+                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+                    LOG.debug("Received AutoReadEvent.ENABLE");
+                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if
channelInactive()
+                    // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered()
can run
+                    // after either of those. Check for null just to be safe ...
+                    if (cnxn != null) {
+                        cnxn.processQueuedBuffer();
+                    }
+                    ctx.channel().config().setAutoRead(true);
+                } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
+                    LOG.debug("Received AutoReadEvent.DISABLE");
+                    ctx.channel().config().setAutoRead(false);
                 }
-            } catch(Exception ex) {
-                LOG.error("Unexpected exception in receive", ex);
-                throw ex;
+            } finally {
+                ReferenceCountUtil.release(evt);
             }
         }
 
-        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
-                        + cnxn.queuedBuffer);
-            }
-
-            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
-                LOG.debug("Received ResumeMessageEvent");
-                if (cnxn.queuedBuffer != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("processing queue "
-                                + Long.toHexString(cnxn.sessionId)
-                                + " queuedBuffer 0x"
-                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                    }
-                    cnxn.receiveMessage(cnxn.queuedBuffer);
-                    if (!cnxn.queuedBuffer.readable()) {
-                        LOG.debug("Processed queue - no bytes remaining");
-                        cnxn.queuedBuffer = null;
-                    } else {
-                        LOG.debug("Processed queue - bytes remaining");
-                    }
-                } else {
-                    LOG.debug("queue empty");
-                }
-                cnxn.channel.setReadable(true);
-            } else {
-                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+            try {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace(Long.toHexString(cnxn.sessionId)
-                            + " buf 0x"
-                            + ChannelBuffers.hexDump(buf));
+                    LOG.trace("message received called {}", msg);
                 }
-                
-                if (cnxn.throttled) {
-                    LOG.debug("Received message while throttled");
-                    // we are throttled, so we need to queue
-                    if (cnxn.queuedBuffer == null) {
-                        LOG.debug("allocating queue");
-                        cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
-                    }
-                    cnxn.queuedBuffer.writeBytes(buf);
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(Long.toHexString(cnxn.sessionId)
-                                + " queuedBuffer 0x"
-                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+                try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("New message {} from {}", msg, ctx.channel());
                     }
-                } else {
-                    LOG.debug("not throttled");
-                    if (cnxn.queuedBuffer != null) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace(Long.toHexString(cnxn.sessionId)
-                                    + " queuedBuffer 0x"
-                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                        }
-                        cnxn.queuedBuffer.writeBytes(buf);
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace(Long.toHexString(cnxn.sessionId)
-                                    + " queuedBuffer 0x"
-                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                        }
-
-                        cnxn.receiveMessage(cnxn.queuedBuffer);
-                        if (!cnxn.queuedBuffer.readable()) {
-                            LOG.debug("Processed queue - no bytes remaining");
-                            cnxn.queuedBuffer = null;
-                        } else {
-                            LOG.debug("Processed queue - bytes remaining");
-                        }
+                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+                    if (cnxn == null) {
+                        LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                     } else {
-                        cnxn.receiveMessage(buf);
-                        if (buf.readable()) {
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Before copy " + buf);
-                            }
-                            cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes()); 
-                            cnxn.queuedBuffer.writeBytes(buf);
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Copy is " + cnxn.queuedBuffer);
-                                LOG.trace(Long.toHexString(cnxn.sessionId)
-                                        + " queuedBuffer 0x"
-                                        + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                            }
-                        }
+                        cnxn.processMessage((ByteBuf) msg);
                     }
+                } catch (Exception ex) {
+                    LOG.error("Unexpected exception in receive", ex);
+                    throw ex;
                 }
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
 
         @Override
-        public void writeComplete(ChannelHandlerContext ctx,
-                WriteCompletionEvent e) throws Exception
-        {
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("write complete " + e);
+                promise.addListener((future) -> {
+                    LOG.trace("write {}",
+                            future.isSuccess() ? "complete" : "failed");
+                });
             }
+            super.write(ctx, msg, promise);
         }
 
-        private final class CertificateVerifier
-                implements ChannelFutureListener {
+        private final class CertificateVerifier implements GenericFutureListener<Future<Channel>>
{
 
 Review comment:
   I found some issues with this code which I fixed internally but haven't upstreamed the
fixes to master yet. So, as-is client TLS might not always work on 3.5. There are some cases
where it doesn't work - I think specifically, anonymous TLS is not possible because line 226
assumes that the client always has certificates, which is not true for anonymous clients.
Nothing to do as part of this PR, but I will look into upstreaming my fixes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message