zookeeper-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] anmolnar commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig tests are constantly failing on 3.5 after applying Java 11 fix
Date Tue, 05 Feb 2019 14:53:13 GMT
anmolnar commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig tests are constantly
failing on 3.5 after applying Java 11 fix
URL: https://github.com/apache/zookeeper/pull/753#discussion_r253896039
 
 

 ##########
 File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ##########
 @@ -61,215 +26,184 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
-import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.X509KeyManager;
+import javax.net.ssl.X509TrustManager;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.NettyUtils;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.common.X509Exception.SSLContextException;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NettyServerCnxnFactory extends ServerCnxnFactory {
     private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
 
-    ServerBootstrap bootstrap;
-    Channel parentChannel;
-    ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
-    HashMap<InetAddress, Set<NettyServerCnxn>> ipMap =
-        new HashMap<InetAddress, Set<NettyServerCnxn>>( );
-    InetSocketAddress localAddress;
-    int maxClientCnxns = 60;
-    ClientX509Util x509Util;
+    private final ServerBootstrap bootstrap;
+    private Channel parentChannel;
+    private final ChannelGroup allChannels =
+            new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor());
+    // Access to ipMap or to any Set contained in the map needs to be
+    // protected with synchronized (ipMap) { ... }
+    private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>();
+    private InetSocketAddress localAddress;
+    private int maxClientCnxns = 60;
+    private final ClientX509Util x509Util;
+
+    private static final AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE =
+            AttributeKey.valueOf("NettyServerCnxn");
+
+    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
+            new AtomicReference<>(null);
 
     /**
-     * This is an inner class since we need to extend SimpleChannelHandler, but
+     * This is an inner class since we need to extend ChannelDuplexHandler, but
      * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
      * this class gets access to the member variables and methods.
      */
     @Sharable
-    class CnxnChannelHandler extends SimpleChannelHandler {
+    class CnxnChannelHandler extends ChannelDuplexHandler {
 
         @Override
-        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
-            throws Exception
-        {
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel closed " + e);
+                LOG.trace("Channel active {}", ctx.channel());
             }
-            allChannels.remove(ctx.getChannel());
-        }
 
-        @Override
-        public void channelConnected(ChannelHandlerContext ctx,
-                ChannelStateEvent e) throws Exception
-        {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel connected " + e);
-            }
-
-            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),
+            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.channel(),
                     zkServer, NettyServerCnxnFactory.this);
-            ctx.setAttachment(cnxn);
+            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
             if (secure) {
-                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
-                ChannelFuture handshakeFuture = sslHandler.handshake();
+                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
+                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                 handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
             } else {
-                allChannels.add(ctx.getChannel());
+                allChannels.add(ctx.channel());
                 addCnxn(cnxn);
             }
         }
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx,
-                ChannelStateEvent e) throws Exception
-        {
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel disconnected " + e);
+                LOG.trace("Channel inactive {}", ctx.channel());
             }
-            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+            allChannels.remove(ctx.channel());
+            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
             if (cnxn != null) {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Channel disconnect caused close " + e);
+                    LOG.trace("Channel inactive caused close {}", cnxn);
                 }
                 cnxn.close();
             }
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-            throws Exception
-        {
-            LOG.warn("Exception caught " + e, e.getCause());
-            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
+            LOG.warn("Exception caught", cause);
+            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
             if (cnxn != null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Closing " + cnxn);
+                    LOG.debug("Closing {}", cnxn);
                 }
                 cnxn.close();
             }
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-            throws Exception
-        {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("message received called " + e.getMessage());
-            }
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
{
             try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("New message " + e.toString()
-                            + " from " + ctx.getChannel());
-                }
-                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
-                synchronized(cnxn) {
-                    processMessage(e, cnxn);
+                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+                    LOG.debug("Received AutoReadEvent.ENABLE");
+                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if
channelInactive()
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message