hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ph...@apache.org
Subject svn commit: r1022642 [1/2] - in /hadoop/zookeeper/branches/ZOOKEEPER-823: ./ src/docs/src/documentation/content/xdocs/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/server/ src/java/test/config/ src/java/test/org/apache/zookeep...
Date Thu, 14 Oct 2010 18:45:49 GMT
Author: phunt
Date: Thu Oct 14 18:45:47 2010
New Revision: 1022642

URL: http://svn.apache.org/viewvc?rev=1022642&view=rev
Log:
Updated patch from 10/14/2010, adds Netty client support.

Added:
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/ThreadUtil.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/NettyNioSuiteBase.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/NettyNioSuiteHammerTest.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/NettyNioSuiteTest.java
Modified:
    hadoop/zookeeper/branches/ZOOKEEPER-823/ivy.xml
    hadoop/zookeeper/branches/ZOOKEEPER-823/ivysettings.xml
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxn.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeper.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/config/findbugsExcludeFile.xml
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/ClientBase.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java
    hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/test/ClientTest.java

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/ivy.xml?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/ivy.xml (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/ivy.xml Thu Oct 14 18:45:47 2010
@@ -41,7 +41,7 @@
     <dependency org="log4j" name="log4j" rev="1.2.15" transitive="false" conf="default"/>
     <dependency org="jline" name="jline" rev="0.9.94" transitive="false" conf="default"/>
 
-    <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.1.5.GA">
+    <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.2.1.Final">
       <artifact name="netty" type="jar" conf="default"/>
     </dependency>
 

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/ivysettings.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/ivysettings.xml?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/ivysettings.xml (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/ivysettings.xml Thu Oct 14 18:45:47 2010
@@ -20,7 +20,7 @@
   <property name="repo.maven.org"
     value="http://repo1.maven.org/maven2/" override="false"/>
   <property name="repo.jboss.org"
-    value="http://repository.jboss.com/maven2/" override="false"/>
+    value="http://repository.jboss.org/nexus/content/groups/public/" override="false"/>
   <property name="repo.sun.org"
     value="http://download.java.net/maven/2/" override="false"/>
   <property name="maven2.pattern"

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml Thu Oct 14 18:45:47 2010
@@ -1050,13 +1050,18 @@ server.3=zoo3:2888:3888</programlisting>
         <para>Prior to version 3.4 ZooKeeper has always used NIO
             directly, however in versions 3.4 and later Netty is
             supported as an option to NIO (replaces). NIO continues to
-            be the default, however Netty based communication can be
-            used in place of NIO by setting the environment variable
+            be the default for both server and client, however Netty
+            based communication can be used in place of NIO by the
+            setting of environment variables.  Set
             "zookeeper.serverCnxnFactory" to
-            "org.apache.zookeeper.server.NettyServerCnxnFactory". You
-            have the option of setting this on either the client(s) or
-            server(s), typically you would want to set this on both,
-            however that is at your discretion.
+            "org.apache.zookeeper.server.NettyServerCnxnFactory" to
+            enable the server to use Netty.  Set
+            "zookeeper.clientCnxnFactory" to
+            "org.apache.zookeeper.NettyClientCnxnFactory" to enable
+            the client to use Netty. You have the option to set this
+            on either the client(s) or server(s), typically you would
+            want to set this on both, however that is at your
+            discretion.
         </para>
         <para>
           TBD - tuning options for netty - currently there are none that are netty specific but we should add some. Esp around max bound on the number of reader worker threads netty creates.

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxn.java Thu Oct 14 18:45:47 2010
@@ -25,9 +25,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -55,7 +52,6 @@ import org.apache.zookeeper.ZooKeeper.Wa
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.proto.AuthPacket;
 import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.ConnectResponse;
 import org.apache.zookeeper.proto.CreateResponse;
 import org.apache.zookeeper.proto.ExistsResponse;
 import org.apache.zookeeper.proto.GetACLResponse;
@@ -85,8 +81,6 @@ public class ClientCnxn {
      * option allows the client to turn off this behavior by setting
      * the environment variable "zookeeper.disableAutoWatchReset" to "true" */
     private static boolean disableAutoWatchReset;
-
-    public static final int packetLen;
     static {
         // this var should not be public, but otw there is no easy way
         // to test
@@ -96,7 +90,6 @@ public class ClientCnxn {
             LOG.debug("zookeeper.disableAutoWatchReset is "
                     + disableAutoWatchReset);
         }
-        packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
     }
 
     private final ArrayList<InetSocketAddress> serverAddrs =
@@ -154,15 +147,13 @@ public class ClientCnxn {
 
     final EventThread eventThread;
 
-    final Selector selector = Selector.open();
-
     /**
      * Set to true when close is called. Latches the connection such that we
      * don't attempt to re-connect to the server if in the middle of closing the
      * connection (client sends session disconnect to server as part of close
      * operation)
      */
-    volatile boolean closing = false;
+    private volatile boolean closing = false;
 
     public long getSessionId() {
         return sessionId;
@@ -180,56 +171,22 @@ public class ClientCnxn {
     public String toString() {
         StringBuilder sb = new StringBuilder();
 
-        SocketAddress local = getLocalSocketAddress();
-        SocketAddress remote = getRemoteSocketAddress();
+        SocketAddress local = sendThread.getSocket().getLocalSocketAddress();
+        SocketAddress remote = sendThread.getSocket().getRemoteSocketAddress();
         sb
             .append("sessionid:0x").append(Long.toHexString(getSessionId()))
             .append(" local:").append(local)
             .append(" remoteserver:").append(remote)
             .append(" lastZxid:").append(lastZxid)
             .append(" xid:").append(xid)
-            .append(" sent:").append(sendThread.sentCount)
-            .append(" recv:").append(sendThread.recvCount)
+            .append(" sent:").append(sendThread.getSocket().getSentCount())
+            .append(" recv:").append(sendThread.getSocket().getRecvCount())
             .append(" queuedpkts:").append(outgoingQueue.size())
             .append(" pendingresp:").append(pendingQueue.size())
             .append(" queuedevents:").append(eventThread.waitingEvents.size());
 
         return sb.toString();
     }
-    
-    /**
-     * Returns the address to which the socket is connected.
-     * @return ip address of the remote side of the connection or null if
-     *         not connected
-     */
-    SocketAddress getRemoteSocketAddress() {
-        // a lot could go wrong here, so rather than put in a bunch of code
-        // to check for nulls all down the chain let's do it the simple
-        // yet bulletproof way
-        try {
-            return ((SocketChannel)sendThread.sockKey.channel())
-                .socket().getRemoteSocketAddress();
-        } catch (NullPointerException e) {
-            return null;
-        }
-    }
-
-    /** 
-     * Returns the local address to which the socket is bound.
-     * @return ip address of the remote side of the connection or null if
-     *         not connected
-     */
-    SocketAddress getLocalSocketAddress() {
-        // a lot could go wrong here, so rather than put in a bunch of code
-        // to check for nulls all down the chain let's do it the simple
-        // yet bulletproof way
-        try {
-            return ((SocketChannel)sendThread.sockKey.channel())
-                .socket().getLocalSocketAddress();
-        } catch (NullPointerException e) {
-            return null;
-        }
-    }
 
     /**
      * This class allows us to pass the headers and the relevant records around.
@@ -322,10 +279,9 @@ public class ClientCnxn {
      * @throws IOException
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
-            ClientWatchManager watcher)
-        throws IOException
-    {
-        this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]);
+            ClientWatchManager watcher, ClientCnxnSocket socket)
+            throws IOException {
+        this(hosts, sessionTimeout, zooKeeper, watcher, socket, 0, new byte[16]);
     }
 
     /**
@@ -345,9 +301,8 @@ public class ClientCnxn {
      * @throws IOException
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
-            ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
-        throws IOException
-    {
+            ClientWatchManager watcher, ClientCnxnSocket socket,
+            long sessionId, byte[] sessionPasswd) throws IOException {
         this.zooKeeper = zooKeeper;
         this.watcher = watcher;
         this.sessionId = sessionId;
@@ -389,7 +344,7 @@ public class ClientCnxn {
         connectTimeout = sessionTimeout / hostsList.length;
         readTimeout = sessionTimeout * 2 / 3;
         Collections.shuffle(serverAddrs);
-        sendThread = new SendThread();
+        sendThread = new SendThread(socket);
         eventThread = new EventThread();
     }
 
@@ -412,9 +367,10 @@ public class ClientCnxn {
         eventThread.start();
     }
 
-    Object eventOfDeath = new Object();
+    private Object eventOfDeath = new Object();
 
-    final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+    private final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+        @Override
         public void uncaughtException(Thread t, Throwable e) {
             LOG.error("from " + t.getName(), e);
         }
@@ -618,7 +574,7 @@ public class ClientCnxn {
         if (p.replyHeader == null) {
             return;
         }
-        switch(zooKeeper.state) {
+        switch(state) {
         case AUTH_FAILED:
             p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
             break;
@@ -631,15 +587,16 @@ public class ClientCnxn {
         finishPacket(p);
     }
 
-    volatile long lastZxid;
+    private volatile long lastZxid;
 
-    private static class EndOfStreamException extends IOException {
+    static class EndOfStreamException extends IOException {
         private static final long serialVersionUID = -5438877188796231422L;
 
         public EndOfStreamException(String msg) {
             super(msg);
         }
-        
+
+        @Override
         public String toString() {
             return "EndOfStreamException: " + getMessage();
         }
@@ -652,7 +609,7 @@ public class ClientCnxn {
             super(msg);
         }
     }
-    
+
     private static class SessionExpiredException extends IOException {
         private static final long serialVersionUID = -1388816932076193249L;
 
@@ -660,279 +617,157 @@ public class ClientCnxn {
             super(msg);
         }
     }
-    
+
+    public static final int packetLen =
+        Integer.getInteger("jute.maxbuffer", 4096 * 1024);
+
     /**
      * This class services the outgoing request queue and generates the heart
      * beats. It also spawns the ReadThread.
      */
     class SendThread extends Thread {
-        SelectionKey sockKey;
-
-        final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
-
-        ByteBuffer incomingBuffer = lenBuffer;
-
-        boolean initialized;
-
         private long lastPingSentNs;
-
-        long sentCount = 0;
-        long recvCount = 0;
-
-        void readLength() throws IOException {
-            int len = incomingBuffer.getInt();
-            if (len < 0 || len >= packetLen) {
-                throw new IOException("Packet len" + len + " is out of range!");
-            }
-            incomingBuffer = ByteBuffer.allocate(len);
-        }
-
-        void readConnectResult() throws IOException {
-            if (LOG.isTraceEnabled()) {
-                StringBuffer buf = new StringBuffer("0x[");
-                for (byte b : incomingBuffer.array()) {
-                    buf.append(Integer.toHexString(b) + ",");
-                }
-                buf.append("]");
-                LOG.trace("readConnectRestult " + incomingBuffer.remaining() 
-                        + " " + buf.toString());
-            }
-            ByteBufferInputStream bbis = new ByteBufferInputStream(
-                    incomingBuffer);
-            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-            ConnectResponse conRsp = new ConnectResponse();
-            conRsp.deserialize(bbia, "connect");
-            negotiatedSessionTimeout = conRsp.getTimeOut();
-            if (negotiatedSessionTimeout <= 0) {
-                zooKeeper.state = States.CLOSED;
-
-                eventThread.queueEvent(new WatchedEvent(
-                        Watcher.Event.EventType.None,
-                        Watcher.Event.KeeperState.Expired, null));
-                eventThread.queueEventOfDeath();
-                throw new SessionExpiredException(
-                        "Unable to reconnect to ZooKeeper service, session 0x"
-                        + Long.toHexString(sessionId) + " has expired");
-            }
-            readTimeout = negotiatedSessionTimeout * 2 / 3;
-            connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
-            sessionId = conRsp.getSessionId();
-            sessionPasswd = conRsp.getPasswd();
-            zooKeeper.state = States.CONNECTED;
-            LOG.info("Session establishment complete on server "
-                    + ((SocketChannel)sockKey.channel())
-                        .socket().getRemoteSocketAddress()
-                    + ", sessionid = 0x"
-                    + Long.toHexString(sessionId)
-                    + ", negotiated timeout = " + negotiatedSessionTimeout);
-            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
-                    Watcher.Event.KeeperState.SyncConnected, null));
+        private final ClientCnxnSocket socket;
+        private int lastConnectIndex = -1;
+        private int currentConnectIndex;
+        private Random r = new Random(System.nanoTime());
+
+        SendThread(ClientCnxnSocket socket) {
+            super(currentThread().getName() + "-SendThread()");
+            state = States.CONNECTING;
+            this.socket = socket;
+            socket.introduce(this, outgoingQueue, sessionId);
+            setUncaughtExceptionHandler(uncaughtExceptionHandler);
+            setDaemon(true);
         }
 
-        void readResponse() throws IOException {
-            ByteBufferInputStream bbis = new ByteBufferInputStream(
-                    incomingBuffer);
-            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-            ReplyHeader replyHdr = new ReplyHeader();
-
-            replyHdr.deserialize(bbia, "header");
-            if (replyHdr.getXid() == -2) {
-                // -2 is the xid for pings
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got ping response for sessionid: 0x"
-                            + Long.toHexString(sessionId)
-                            + " after "
-                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
-                            + "ms");
-                }
-                return;
-            }
-            if (replyHdr.getXid() == -4) {
-                // -4 is the xid for AuthPacket               
-                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
-                    zooKeeper.state = States.AUTH_FAILED;                    
-                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
-                            Watcher.Event.KeeperState.AuthFailed, null) );            		            		
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got auth sessionid:0x"
-                            + Long.toHexString(sessionId));
-                }
-                return;
-            }
-            if (replyHdr.getXid() == -1) {
-                // -1 means notification
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got notification sessionid:0x"
-                        + Long.toHexString(sessionId));
-                }
-                WatcherEvent event = new WatcherEvent();
-                event.deserialize(bbia, "response");
-
-                // convert from a server path to a client path
-                if (chrootPath != null) {
-                    String serverPath = event.getPath();
-                    if(serverPath.compareTo(chrootPath)==0)
-                        event.setPath("/");
-                    else
-                        event.setPath(serverPath.substring(chrootPath.length()));
-                }
-
-                WatchedEvent we = new WatchedEvent(event);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got " + we + " for sessionid 0x"
-                            + Long.toHexString(sessionId));
-                }
-
-                eventThread.queueEvent( we );
-                return;
-            }
-            if (pendingQueue.size() == 0) {
-                throw new IOException("Nothing in the queue, but got "
-                        + replyHdr.getXid());
-            }
-            Packet packet = null;
-            synchronized (pendingQueue) {
-                packet = pendingQueue.remove();
-            }
-            /*
-             * Since requests are processed in order, we better get a response
-             * to the first request!
-             */
-            try {
-                if (packet.header.getXid() != replyHdr.getXid()) {
-                    packet.replyHeader.setErr(
-                            KeeperException.Code.CONNECTIONLOSS.intValue());
-                    throw new IOException("Xid out of order. Got "
-                            + replyHdr.getXid() + " expected "
-                            + packet.header.getXid());
-                }
-
-                packet.replyHeader.setXid(replyHdr.getXid());
-                packet.replyHeader.setErr(replyHdr.getErr());
-                packet.replyHeader.setZxid(replyHdr.getZxid());
-                if (replyHdr.getZxid() > 0) {
-                    lastZxid = replyHdr.getZxid();
-                }
-                if (packet.response != null && replyHdr.getErr() == 0) {
-                    packet.response.deserialize(bbia, "response");
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Reading reply sessionid:0x"
-                            + Long.toHexString(sessionId) + ", packet:: " + packet);
-                }
-            } finally {
-                finishPacket(packet);
-            }
-        }
+        @Override
+        public void run() {
+            socket.updateNow();
+            socket.updateLastSendAndHeard();
 
-        /**
-         * @return true if a packet was received
-         * @throws InterruptedException
-         * @throws IOException
-         */
-        boolean doIO() throws InterruptedException, IOException {
-            boolean packetReceived = false;
-            SocketChannel sock = (SocketChannel) sockKey.channel();
-            if (sock == null) {
-                throw new IOException("Socket is null!");
-            }
-            if (sockKey.isReadable()) {
-                int rc = sock.read(incomingBuffer);
-                if (rc < 0) {
-                    throw new EndOfStreamException(
-                            "Unable to read additional data from server sessionid 0x"
-                            + Long.toHexString(sessionId)
-                            + ", likely server has closed socket");
-                }
-                if (!incomingBuffer.hasRemaining()) {
-                    incomingBuffer.flip();
-                    if (incomingBuffer == lenBuffer) {
-                        recvCount++;
-                        readLength();
-                    } else if (!initialized) {
-                        readConnectResult();
-                        enableRead();
-                        if (!outgoingQueue.isEmpty()) {
-                            enableWrite();
+            while (state.isAlive()) {
+                try {
+                    if (!socket.isConnected()) {
+                        // don't re-establish connection if we are closing
+                        if (closing) {
+                            break;
                         }
-                        lenBuffer.clear();
-                        incomingBuffer = lenBuffer;
-                        packetReceived = true;
-                        initialized = true;
-                    } else {
-                        readResponse();
-                        lenBuffer.clear();
-                        incomingBuffer = lenBuffer;
-                        packetReceived = true;
+                        startConnect();
+                        socket.updateLastSendAndHeard();
                     }
-                }
-            }
-            if (sockKey.isWritable()) {
-                synchronized (outgoingQueue) {
-                    if (!outgoingQueue.isEmpty()) {
-                        ByteBuffer pbb = outgoingQueue.getFirst().bb;
-                        sock.write(pbb);
-                        if (!pbb.hasRemaining()) {
-                            sentCount++;
-                            Packet p = outgoingQueue.removeFirst();
-                            if (p.header != null
-                                    && p.header.getType() != OpCode.ping
-                                    && p.header.getType() != OpCode.auth) {
-                                pendingQueue.add(p);
+
+                    int to = readTimeout - socket.getIdleRecv();
+                    if (state != States.CONNECTED) {
+                        to = connectTimeout - socket.getIdleRecv();
+                    }
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("TO=" + to);
+                    }
+                    if (to <= 0) {
+                        throw new SessionTimeoutException(
+                                "Client session timed out, have not heard from server in "
+                                        + socket.getIdleRecv() + "ms"
+                                        + " for sessionid 0x"
+                                        + Long.toHexString(sessionId));
+                    }
+                    if (state == States.CONNECTED) {
+                        int timeToNextPing = readTimeout / 2
+                                - socket.getIdleSend();
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("timeToNextPing=" + timeToNextPing);
+                        }
+                        if (timeToNextPing <= 0) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("timeToNextPing=" + timeToNextPing);
+                            }
+                            sendPing();
+                            socket.updateLastSend();
+                            socket.enableWrite();
+                        } else {
+                            if (timeToNextPing < to) {
+                                to = timeToNextPing;
                             }
                         }
                     }
-                }
-            }
-            if (outgoingQueue.isEmpty()) {
-                disableWrite();
-            } else {
-                enableWrite();
-            }
-            return packetReceived;
-        }
 
-        synchronized private void enableWrite() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_WRITE) == 0) {
-                sockKey.interestOps(i | SelectionKey.OP_WRITE);
-            }
-        }
-
-        synchronized private void disableWrite() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_WRITE) != 0) {
-                sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
-            }
-        }
-
-        synchronized private void enableRead() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_READ) == 0) {
-                sockKey.interestOps(i | SelectionKey.OP_READ);
+                    socket.doTransport(to, pendingQueue);
+                } catch (Exception e) {
+                    if (closing) {
+                        if (LOG.isDebugEnabled()) {
+                            // closing so this is expected
+                            LOG.debug("An exception was thrown while closing send thread for session 0x"
+                                    + Long.toHexString(getSessionId())
+                                    + " : " + e.getMessage());
+                        }
+                        break;
+                    } else {
+                        // this is ugly, you have a better way speak up
+                        if (e instanceof SessionExpiredException) {
+                            LOG.info(e.getMessage() + ", closing socket connection");
+                        } else if (e instanceof SessionTimeoutException) {
+                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
+                        } else if (e instanceof EndOfStreamException) {
+                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
+                        } else {
+                            LOG.warn("Session 0x"
+                                    + Long.toHexString(getSessionId())
+                                    + " for server "
+                                    // TODO: this is different in Netty
+                                            // and NIO
+                                            // Netty:
+                                            // + getRemoteSocketAddress()
+                                            // NIO:
+                                            // +
+                                            // ((SocketChannel)sockKey.channel())
+                                            // .socket().getRemoteSocketAddress()
+                                    + socket.getRemoteSocketAddress()
+                                    + ", unexpected error"
+                                    + RETRY_CONN_MSG,
+                                    e);
+                        }
+                        socket.cleanup();
+                        if (state.isAlive()) {
+                            eventThread.queueEvent(new WatchedEvent(
+                                    Event.EventType.None,
+                                    Event.KeeperState.Disconnected,
+                                    null));
+                        }
+                        socket.updateNow();
+                        socket.updateLastSendAndHeard();
+                    }
+                } // catch
+            } // while
+            socket.cleanup();
+            socket.close();
+            if (state.isAlive()) {
+                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
+                        Event.KeeperState.Disconnected, null));
             }
+            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
+                    "SendThread exitedloop.");
         }
 
-        synchronized private void disableRead() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_READ) != 0) {
-                sockKey.interestOps(i & (~SelectionKey.OP_READ));
-            }
+        // TODO: can not name this method getState since Thread.getState()
+        // already exists
+        // It would be cleaner to make class SendThread an implementation of
+        // Runnable
+        /**
+         * Used by ClientCnxnSocket
+         * 
+         * @return
+         */
+        ZooKeeper.States getZkState() {
+            return state;
         }
 
-        SendThread() {
-            super(makeThreadName("-SendThread()"));
-            zooKeeper.state = States.CONNECTING;
-            setUncaughtExceptionHandler(uncaughtExceptionHandler);
-            setDaemon(true);
+        ClientCnxnSocket getSocket() {
+            return socket;
         }
 
-        private void primeConnection(SelectionKey k) throws IOException {
+        void primeConnection() throws IOException {
             LOG.info("Socket connection established to "
-                    + ((SocketChannel)sockKey.channel())
-                        .socket().getRemoteSocketAddress()
+                    + socket.getRemoteSocketAddress() 
                     + ", initiating session");
             lastConnectIndex = currentConnectIndex;
             ConnectRequest conReq = new ConnectRequest(0, lastZxid,
@@ -948,6 +783,8 @@ public class ClientCnxn {
             synchronized (outgoingQueue) {
                 // We add backwards since we are pushing into the front
                 // Only send if there's a pending watch
+                // TODO: here we have the only remaining use of zooKeeper in
+                // this class. It's to be eliminated!
                 if (!disableAutoWatchReset &&
                         (!zooKeeper.getDataWatches().isEmpty()
                          || !zooKeeper.getExistWatches().isEmpty()
@@ -965,21 +802,23 @@ public class ClientCnxn {
                     outgoingQueue.addFirst(packet);
                 }
 
-                for (AuthData id : authInfo) {
-                    outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
-                            OpCode.auth), null, new AuthPacket(0, id.scheme,
-                            id.data), null, null, null));
+                synchronized (authInfo) {
+                    for (AuthData id : authInfo) {
+                        outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
+                                OpCode.auth), null, new AuthPacket(0,
+                                id.scheme, id.data), null, null, null));
+                    }
                 }
+
                 outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
                         null)));
             }
-            synchronized (this) {
-                k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+            synchronized(socket) {
+                socket.enableReadWriteOnly();
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Session establishment request sent on "
-                        + ((SocketChannel)sockKey.channel())
-                            .socket().getRemoteSocketAddress());
+                        + socket.getRemoteSocketAddress());
             }
         }
 
@@ -989,12 +828,6 @@ public class ClientCnxn {
             queuePacket(h, null, null, null, null, null, null, null, null);
         }
 
-        int lastConnectIndex = -1;
-
-        int currentConnectIndex;
-
-        Random r = new Random(System.nanoTime());
-
         private void startConnect() throws IOException {
             if (lastConnectIndex == -1) {
                 // We don't want to delay the first try at a connect, so we
@@ -1015,7 +848,7 @@ public class ClientCnxn {
                     }
                 }
             }
-            zooKeeper.state = States.CONNECTING;
+            state = States.CONNECTING;
             currentConnectIndex = nextAddrToTry;
             InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
             nextAddrToTry++;
@@ -1023,230 +856,172 @@ public class ClientCnxn {
                 nextAddrToTry = 0;
             }
             LOG.info("Opening socket connection to server " + addr);
-            SocketChannel sock;
-            sock = SocketChannel.open();
-            sock.configureBlocking(false);
-            sock.socket().setSoLinger(false, -1);
-            sock.socket().setTcpNoDelay(true);
+            
             setName(getName().replaceAll("\\(.*\\)",
                     "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
-            sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
-            if (sock.connect(addr)) {
-                primeConnection(sockKey);
-            }
-            initialized = false;
 
-            /*
-             * Reset incomingBuffer
-             */
-            lenBuffer.clear();
-            incomingBuffer = lenBuffer;
+            socket.connect(addr);
         }
 
         private static final String RETRY_CONN_MSG =
             ", closing socket connection and attempting reconnect";
-        
-        @Override
-        public void run() {
-            long now = System.currentTimeMillis();
-            long lastHeard = now;
-            long lastSend = now;
-            while (zooKeeper.state.isAlive()) {
-                try {
-                    if (sockKey == null) {
-                        // don't re-establish connection if we are closing
-                        if (closing) {
-                            break;
-                        }
-                        startConnect();
-                        lastSend = now;
-                        lastHeard = now;
-                    }
-                    int idleRecv = (int) (now - lastHeard);
-                    int idleSend = (int) (now - lastSend);
-                    int to = readTimeout - idleRecv;
-                    if (zooKeeper.state != States.CONNECTED) {
-                        to = connectTimeout - idleRecv;
-                    }
-                    if (to <= 0) {
-                        throw new SessionTimeoutException(
-                                "Client session timed out, have not heard from server in "
-                                + idleRecv + "ms"
-                                + " for sessionid 0x"
-                                + Long.toHexString(sessionId));
-                    }
-                    if (zooKeeper.state == States.CONNECTED) {
-                        int timeToNextPing = readTimeout/2 - idleSend;
-                        if (timeToNextPing <= 0) {
-                            sendPing();
-                            lastSend = now;
-                            enableWrite();
-                        } else {
-                            if (timeToNextPing < to) {
-                                to = timeToNextPing;
-                            }
-                        }
-                    }
-
-                    selector.select(to);
-                    Set<SelectionKey> selected;
-                    synchronized (this) {
-                        selected = selector.selectedKeys();
-                    }
-                    // Everything below and until we get back to the select is
-                    // non blocking, so time is effectively a constant. That is
-                    // Why we just have to do this once, here
-                    now = System.currentTimeMillis();
-                    for (SelectionKey k : selected) {
-                        SocketChannel sc = ((SocketChannel) k.channel());
-                        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
-                            if (sc.finishConnect()) {
-                                lastHeard = now;
-                                lastSend = now;
-                                primeConnection(k);
-                            }
-                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-                            if (outgoingQueue.size() > 0) {
-                                // We have something to send so it's the same
-                                // as if we do the send now.
-                                lastSend = now;
-                            }
-                            if (doIO()) {
-                                lastHeard = now;
-                            }
-                        }
-                    }
-                    if (zooKeeper.state == States.CONNECTED) {
-                        if (outgoingQueue.size() > 0) {
-                            enableWrite();
-                        } else {
-                            disableWrite();
-                        }
-                    }
-                    selected.clear();
-                } catch (Exception e) {
-                    if (closing) {
-                        if (LOG.isDebugEnabled()) {
-                            // closing so this is expected
-                            LOG.debug("An exception was thrown while closing send thread for session 0x"
-                                    + Long.toHexString(getSessionId())
-                                    + " : " + e.getMessage());
-                        }
-                        break;
-                    } else {
-                        // this is ugly, you have a better way speak up
-                        if (e instanceof SessionExpiredException) {
-                            LOG.info(e.getMessage() + ", closing socket connection");
-                        } else if (e instanceof SessionTimeoutException) {
-                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
-                        } else if (e instanceof EndOfStreamException) {
-                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
-                        } else {
-                            LOG.warn("Session 0x"
-                                    + Long.toHexString(getSessionId())
-                                    + " for server "
-                                    + ((SocketChannel)sockKey.channel())
-                                        .socket().getRemoteSocketAddress()
-                                    + ", unexpected error"
-                                    + RETRY_CONN_MSG,
-                                    e);
-                        }
-                        cleanup();
-                        if (zooKeeper.state.isAlive()) {
-                            eventThread.queueEvent(new WatchedEvent(
-                                    Event.EventType.None,
-                                    Event.KeeperState.Disconnected,
-                                    null));
-                        }
 
-                        now = System.currentTimeMillis();
-                        lastHeard = now;
-                        lastSend = now;
-                    }
+        void cleanup() {
+            synchronized (pendingQueue) {
+                for (Packet p : pendingQueue) {
+                    conLossPacket(p);
                 }
+                pendingQueue.clear();
             }
-            cleanup();
-            try {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Doing client selector close");
-                }
-                selector.close();
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Closed client selector");
+            synchronized (outgoingQueue) {
+                for (Packet p : outgoingQueue) {
+                    conLossPacket(p);
                 }
-            } catch (IOException e) {
-                LOG.warn("Ignoring exception during selector close", e);
+                outgoingQueue.clear();
             }
-            if (zooKeeper.state.isAlive()) {
+        }
+
+        void onConnected(int _negotiatedSessionTimeout, long _sessionId,
+                byte[] _sessionPasswd) throws IOException {
+            negotiatedSessionTimeout = _negotiatedSessionTimeout;
+            if (negotiatedSessionTimeout <= 0) {
+                state = States.CLOSED;
+
                 eventThread.queueEvent(new WatchedEvent(
-                        Event.EventType.None,
-                        Event.KeeperState.Disconnected,
-                        null));
+                        Watcher.Event.EventType.None,
+                        Watcher.Event.KeeperState.Expired, null));
+                eventThread.queueEventOfDeath();
+                throw new SessionExpiredException(
+                        "Unable to reconnect to ZooKeeper service, session 0x"
+                        + Long.toHexString(sessionId) + " has expired");
             }
-            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-                                     "SendThread exitedloop.");
+            readTimeout = negotiatedSessionTimeout * 2 / 3;
+            connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
+            sessionId = _sessionId;
+            sessionPasswd = _sessionPasswd;
+            state = States.CONNECTED;
+            LOG.info("Session establishment complete on server "
+                    + socket.getRemoteSocketAddress() 
+                    + ", sessionid = 0x"
+                    + Long.toHexString(sessionId)
+                    + ", negotiated timeout = " + negotiatedSessionTimeout);
+            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
+                    Watcher.Event.KeeperState.SyncConnected, null));
         }
 
-        private void cleanup() {
-            if (sockKey != null) {
-                SocketChannel sock = (SocketChannel) sockKey.channel();
-                sockKey.cancel();
-                try {
-                    sock.socket().shutdownInput();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during shutdown input", e);
-                    }
-                }
-                try {
-                    sock.socket().shutdownOutput();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during shutdown output", e);
-                    }
+        void readResponse(ByteBuffer incomingBuffer) throws IOException {
+            ByteBufferInputStream bbis = new ByteBufferInputStream(
+                    incomingBuffer);
+            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
+            ReplyHeader replyHdr = new ReplyHeader();
+
+            replyHdr.deserialize(bbia, "header");
+            if (replyHdr.getXid() == -2) {
+                // -2 is the xid for pings
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got ping response for sessionid: 0x"
+                            + Long.toHexString(sessionId)
+                            + " after "
+                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
+                            + "ms");
                 }
-                try {
-                    sock.socket().close();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during socket close", e);
-                    }
+                return;
+            }
+            if (replyHdr.getXid() == -4) {
+                // -4 is the xid for AuthPacket               
+                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
+                    state = States.AUTH_FAILED;                    
+                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
+                            Watcher.Event.KeeperState.AuthFailed, null) );            		            		
                 }
-                try {
-                    sock.close();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during channel close", e);
-                    }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got auth sessionid:0x"
+                            + Long.toHexString(sessionId));
                 }
+                return;
             }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
+            if (replyHdr.getXid() == -1) {
+                // -1 means notification
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got notification sessionid:0x"
+                        + Long.toHexString(sessionId));
+                }
+                WatcherEvent event = new WatcherEvent();
+                event.deserialize(bbia, "response");
+
+                // convert from a server path to a client path
+                if (chrootPath != null) {
+                    String serverPath = event.getPath();
+                    if(serverPath.compareTo(chrootPath)==0)
+                        event.setPath("/");
+                    else
+                        event.setPath(serverPath.substring(chrootPath.length()));
+                }
+
+                WatchedEvent we = new WatchedEvent(event);
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("SendThread interrupted during sleep, ignoring");
+                    LOG.debug("Got " + we + " for sessionid 0x"
+                            + Long.toHexString(sessionId));
                 }
+
+                eventThread.queueEvent( we );
+                return;
             }
-            sockKey = null;
+            Packet packet;
             synchronized (pendingQueue) {
-                for (Packet p : pendingQueue) {
-                    conLossPacket(p);
+                if (pendingQueue.size() == 0) {
+                    throw new IOException("Nothing in the queue, but got "
+                            + replyHdr.getXid());
                 }
-                pendingQueue.clear();
+                packet = pendingQueue.remove();
             }
-            synchronized (outgoingQueue) {
-                for (Packet p : outgoingQueue) {
-                    conLossPacket(p);
+            /*
+             * Since requests are processed in order, we better get a response
+             * to the first request!
+             */
+            try {
+                if (packet.header.getXid() != replyHdr.getXid()) {
+                    packet.replyHeader.setErr(
+                            KeeperException.Code.CONNECTIONLOSS.intValue());
+                    throw new IOException("Xid out of order. Got Xid "
+                            + replyHdr.getXid() + " with err " +
+                            + replyHdr.getErr() +
+                            " expected Xid "
+                            + packet.header.getXid()
+                            + " for a packet with details: "
+                            + packet );
                 }
-                outgoingQueue.clear();
+
+                packet.replyHeader.setXid(replyHdr.getXid());
+                packet.replyHeader.setErr(replyHdr.getErr());
+                packet.replyHeader.setZxid(replyHdr.getZxid());
+                if (replyHdr.getZxid() > 0) {
+                    lastZxid = replyHdr.getZxid();
+                }
+                if (packet.response != null && replyHdr.getErr() == 0) {
+                    packet.response.deserialize(bbia, "response");
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Reading reply sessionid:0x"
+                            + Long.toHexString(sessionId) + ", packet:: " + packet);
+                }
+            } finally {
+                finishPacket(packet);
             }
         }
 
-        public void close() {
-            zooKeeper.state = States.CLOSED;
-            synchronized (this) {
-                selector.wakeup();
+        void close() {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("close called sessionId:0x"
+                        + Long.toHexString(sessionId));
             }
+            state = States.CLOSED;
+            socket.wakeupCnxn();
+        }
+
+        void testableCloseSocket() throws IOException {
+            socket.testableCloseSocket();
         }
     }
 
@@ -1292,6 +1067,8 @@ public class ClientCnxn {
 
     private int xid = 1;
 
+    private volatile States state;
+
     synchronized private int getXid() {
         return xid++;
     }
@@ -1325,7 +1102,7 @@ public class ClientCnxn {
             packet.ctx = ctx;
             packet.clientPath = clientPath;
             packet.serverPath = serverPath;
-            if (!zooKeeper.state.isAlive() || closing) {
+            if (!state.isAlive() || closing) {
                 conLossPacket(packet);
             } else {
                 // If the client is asking to close the session then
@@ -1336,19 +1113,23 @@ public class ClientCnxn {
                 outgoingQueue.add(packet);
             }
         }
-        synchronized (sendThread) {
-            selector.wakeup();
-        }
+        sendThread.getSocket().wakeupCnxn();
         return packet;
     }
 
     public void addAuthInfo(String scheme, byte auth[]) {
-        if (!zooKeeper.state.isAlive()) {
+        if (!state.isAlive()) {
             return;
         }
-        authInfo.add(new AuthData(scheme, auth));
+        synchronized (authInfo) {
+            authInfo.add(new AuthData(scheme, auth));
+        }
         queuePacket(new RequestHeader(-4, OpCode.auth), null,
                 new AuthPacket(0, scheme, auth), null, null, null, null,
                 null, null);
     }
+
+    States getState() {
+        return state;
+    }
 }

Added: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java?rev=1022642&view=auto
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java (added)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java Thu Oct 14 18:45:47 2010
@@ -0,0 +1,126 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+
+abstract class ClientCnxnSocket {
+    private static final Logger LOG = Logger.getLogger(ClientCnxnSocket.class);
+
+    protected boolean initialized;
+
+    protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
+
+    protected ByteBuffer incomingBuffer = lenBuffer;
+    protected long sentCount = 0;
+    protected long recvCount = 0;
+    protected long lastHeard;
+    protected long lastSend;
+    protected long now;
+    protected ClientCnxn.SendThread sendThread;
+    protected LinkedList<ClientCnxn.Packet> outgoingQueue;
+    /**
+     * The sessionId is only available here for Log and Exception messages.
+     * Otherwise the socket doesn't know it.
+     */
+    protected long sessionId;
+
+    void introduce(ClientCnxn.SendThread sendThread,
+            LinkedList<ClientCnxn.Packet> outgoingQueue, long sessionId) {
+        this.sendThread = sendThread;
+        this.outgoingQueue = outgoingQueue;
+        this.sessionId = sessionId;
+    }
+
+    void updateNow() {
+        now = System.currentTimeMillis();
+    }
+
+    int getIdleRecv() {
+        return (int) (now - lastHeard);
+    }
+
+    int getIdleSend() {
+        return (int) (now - lastSend);
+    }
+
+    long getSentCount() {
+        return sentCount;
+    }
+
+    long getRecvCount() {
+        return recvCount;
+    }
+
+    void updateLastHeard() {
+        this.lastHeard = now;
+    }
+
+    void updateLastSend() {
+        this.lastSend = now;
+    }
+
+    void updateLastSendAndHeard() {
+        this.lastSend = now;
+        this.lastHeard = now;
+    }
+
+    protected void readLength() throws IOException {
+        int len = incomingBuffer.getInt();
+        if (len < 0 || len >= ClientCnxn.packetLen) {
+            throw new IOException("Packet len" + len + " is out of range!");
+        }
+        incomingBuffer = ByteBuffer.allocate(len);
+    }
+
+    void readConnectResult() throws IOException {
+        if (LOG.isTraceEnabled()) {
+            StringBuffer buf = new StringBuffer("0x[");
+            for (byte b : incomingBuffer.array()) {
+                buf.append(Integer.toHexString(b) + ",");
+            }
+            buf.append("]");
+            LOG.trace("readConnectRestult " + incomingBuffer.remaining() + " "
+                    + buf.toString());
+        }
+        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
+        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
+        ConnectResponse conRsp = new ConnectResponse();
+        conRsp.deserialize(bbia, "connect");
+        this.sessionId = conRsp.getSessionId();
+        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
+                conRsp.getPasswd());
+    }
+
+    abstract boolean isConnected();
+
+    abstract void connect(InetSocketAddress addr) throws IOException;
+
+    abstract SocketAddress getRemoteSocketAddress();
+
+    abstract SocketAddress getLocalSocketAddress();
+
+    abstract void cleanup();
+
+    abstract void close();
+
+    abstract void wakeupCnxn();
+
+    abstract void enableWrite();
+
+    abstract void enableReadWriteOnly();
+
+    abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+            throws IOException;
+
+    abstract void testableCloseSocket() throws IOException;
+}

Added: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1022642&view=auto
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (added)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Thu Oct 14 18:45:47 2010
@@ -0,0 +1,303 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public class ClientCnxnSocketNIO extends ClientCnxnSocket {
+    private static final Logger LOG = Logger
+            .getLogger(ClientCnxnSocketNIO.class);
+
+    private final Selector selector = Selector.open();
+
+    private SelectionKey sockKey;
+
+    ClientCnxnSocketNIO() throws IOException {
+        super();
+    }
+
+    @Override
+    boolean isConnected() {
+        return sockKey != null;
+    }
+    
+    /**
+     * @return true if a packet was received
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    boolean doIO(List<Packet> pendingQueue) throws IOException {
+        boolean packetReceived = false;
+        SocketChannel sock = (SocketChannel) sockKey.channel();
+        if (sock == null) {
+            throw new IOException("Socket is null!");
+        }
+        if (sockKey.isReadable()) {
+            int rc = sock.read(incomingBuffer);
+            if (rc < 0) {
+                throw new EndOfStreamException(
+                        "Unable to read additional data from server sessionid 0x"
+                        + Long.toHexString(sessionId)
+                        + ", likely server has closed socket");
+            }
+            if (!incomingBuffer.hasRemaining()) {
+                incomingBuffer.flip();
+                if (incomingBuffer == lenBuffer) {
+                    recvCount++;
+                    readLength();
+                } else if (!initialized) {
+                    readConnectResult();
+                    enableRead();
+                    if (!outgoingQueue.isEmpty()) {
+                        enableWrite();
+                    }
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+                    packetReceived = true;
+                    initialized = true;
+                } else {
+                    sendThread.readResponse(incomingBuffer);
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+                    packetReceived = true;
+                }
+            }
+        }
+        if (sockKey.isWritable()) {
+            synchronized (outgoingQueue) {
+                if (!outgoingQueue.isEmpty()) {
+                    ByteBuffer pbb = outgoingQueue.getFirst().bb;
+                    sock.write(pbb);
+                    if (!pbb.hasRemaining()) {
+                        sentCount++;
+                        Packet p = outgoingQueue.removeFirst();
+                        if (p.header != null
+                                && p.header.getType() != OpCode.ping
+                                && p.header.getType() != OpCode.auth) {
+                            pendingQueue.add(p);
+                        }
+                    }
+                }
+            }
+        }
+        if (outgoingQueue.isEmpty()) {
+            disableWrite();
+        } else {
+            enableWrite();
+        }
+        return packetReceived;
+    }
+
+    @Override
+    void cleanup() {
+        if (sockKey != null) {
+            SocketChannel sock = (SocketChannel) sockKey.channel();
+            sockKey.cancel();
+            try {
+                sock.socket().shutdownInput();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during shutdown input", e);
+                }
+            }
+            try {
+                sock.socket().shutdownOutput();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during shutdown output", e);
+                }
+            }
+            try {
+                sock.socket().close();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during socket close", e);
+                }
+            }
+            try {
+                sock.close();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during channel close", e);
+                }
+            }
+        }
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("SendThread interrupted during sleep, ignoring");
+            }
+        }
+        sockKey = null;
+        sendThread.cleanup();
+    }
+
+    @Override
+    void close() {
+        try {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Doing client selector close");
+            }
+            selector.close();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Closed client selector");
+            }
+        } catch (IOException e) {
+            LOG.warn("Ignoring exception during selector close", e);
+        }
+    }
+
+    @Override
+    void connect(InetSocketAddress addr) throws IOException {
+        SocketChannel sock;
+        sock = SocketChannel.open();
+        sock.configureBlocking(false);
+        sock.socket().setSoLinger(false, -1);
+        sock.socket().setTcpNoDelay(true);
+        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
+        if (sock.connect(addr)) {
+            sendThread.primeConnection();
+        }
+        initialized = false;
+
+        /*
+         * Reset incomingBuffer
+         */
+        lenBuffer.clear();
+        incomingBuffer = lenBuffer;
+    }
+
+    /**
+     * Returns the address to which the socket is connected.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getRemoteSocketAddress() {
+        // a lot could go wrong here, so rather than put in a bunch of code
+        // to check for nulls all down the chain let's do it the simple
+        // yet bulletproof way
+        try {
+            return ((SocketChannel) sockKey.channel()).socket()
+                    .getRemoteSocketAddress();
+        } catch (NullPointerException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the local address to which the socket is bound.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getLocalSocketAddress() {
+        // a lot could go wrong here, so rather than put in a bunch of code
+        // to check for nulls all down the chain let's do it the simple
+        // yet bulletproof way
+        try {
+            return ((SocketChannel) sockKey.channel()).socket()
+                    .getLocalSocketAddress();
+        } catch (NullPointerException e) {
+            return null;
+        }
+    }
+
+    @Override
+    void wakeupCnxn() {
+        synchronized (this) {
+            selector.wakeup();
+        }
+    }
+
+    @Override
+    void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+            throws IOException {
+        selector.select(waitTimeOut);
+
+        Set<SelectionKey> selected;
+        synchronized (this) {
+            selected = selector.selectedKeys();
+        }
+        // Everything below and until we get back to the select is
+        // non blocking, so time is effectively a constant. That is
+        // Why we just have to do this once, here
+        updateNow();
+
+        for (SelectionKey k : selected) {
+            SocketChannel sc = ((SocketChannel) k.channel());
+            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
+                if (sc.finishConnect()) {
+                    updateLastSendAndHeard();
+                    sendThread.primeConnection();
+                }
+            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+                if (outgoingQueue.size() > 0) {
+                    // We have something to send so it's the same
+                    // as if we do the send now.
+                    updateLastSend();
+                }
+                if (doIO(pendingQueue)) {
+                    updateLastHeard();
+                }
+            }
+        }
+        if (sendThread.getZkState() == States.CONNECTED) {
+            if (outgoingQueue.size() > 0) {
+                enableWrite();
+            } else {
+                disableWrite();
+            }
+        }
+        selected.clear();
+    }
+
+    @Override
+    void testableCloseSocket() throws IOException {
+        LOG.info("testableCloseSocket() called");
+        ((SocketChannel) sockKey.channel()).socket().close();
+    }
+
+    @Override
+    synchronized void enableWrite() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_WRITE) == 0) {
+            sockKey.interestOps(i | SelectionKey.OP_WRITE);
+        }
+    }
+
+    synchronized private void disableWrite() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_WRITE) != 0) {
+            sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
+        }
+    }
+
+    synchronized private void enableRead() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_READ) == 0) {
+            sockKey.interestOps(i | SelectionKey.OP_READ);
+        }
+    }
+
+    // TODO: Why isn't that synchronized like the others?
+    @Override
+    void enableReadWriteOnly() {
+        sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+    }
+}

Added: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java?rev=1022642&view=auto
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java (added)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java Thu Oct 14 18:45:47 2010
@@ -0,0 +1,299 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelState;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+public class ClientCnxnSocketNetty extends ClientCnxnSocket {
+    private static final Logger LOG = Logger
+            .getLogger(ClientCnxnSocketNetty.class);
+
+    private Channel channel;
+
+    private ChannelFactory factory;
+
+    private boolean disconnected;
+
+    @Override
+    boolean isConnected() {
+        return channel != null;
+    }
+
+    private boolean doWrites(List<Packet> pendingQueue) {
+        boolean written = false;
+        while (!outgoingQueue.isEmpty() && channel.isWritable()) {
+            Packet p;
+            synchronized(outgoingQueue){
+                p = outgoingQueue.removeFirst();                
+            }
+
+            ByteBuffer pbb = p.bb;
+            ChannelFuture write;
+            synchronized(pendingQueue){
+                write = channel.write(ChannelBuffers
+                        .copiedBuffer(pbb));
+                pbb.position(pbb.limit());
+                if (p.header != null && p.header.getType() != OpCode.ping
+                        && p.header.getType() != OpCode.auth) {
+                    pendingQueue.add(p);
+                }               
+            }
+            if (p.header != null && p.header.getType() == OpCode.closeSession) {
+                // ensure that the close session is sent before
+                // we close the channel
+                write.awaitUninterruptibly();
+            }
+
+            written = true;
+            sentCount++;
+        }
+        return written;
+    }
+
+    @Override
+    void connect(InetSocketAddress addr) throws IOException {
+        factory = new NioClientSocketChannelFactory(
+                Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool());
+
+        ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
+            public ChannelPipeline getPipeline() {
+                return Channels.pipeline(new ZKClientHandler());
+            }
+        });
+
+        bootstrap.setOption("soLinger", -1);
+        bootstrap.setOption("tcpNoDelay", true);
+
+        disconnected = false;
+        bootstrap.connect(addr);
+    }
+
+    @Override
+    void enableReadWriteOnly() {
+
+    }
+
+    /**
+     * Returns the address to which the socket is connected.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    public SocketAddress getRemoteSocketAddress() {
+        if (channel == null) {
+            return null;
+        }
+
+        return channel.getRemoteAddress();
+    }
+
+    /**
+     * Returns the local address to which the socket is bound.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getLocalSocketAddress() {
+        if (channel == null) {
+            return null;
+        }
+
+        return channel.getLocalAddress();
+    }
+
+    @Override
+    void cleanup() {
+        if (channel != null) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("cleanup closing sessionId:0x"
+                        + Long.toHexString(sessionId));
+            }
+            channel.close().awaitUninterruptibly();
+        }
+        channel = null;
+        if (factory != null) {
+            factory.releaseExternalResources();
+        }
+        sendThread.cleanup();
+    }
+
+    @Override
+    void close() {
+        // NO-OP
+    }
+
+    @Override
+    void testableCloseSocket() throws IOException {
+        LOG.info("testableCloseSocket() called");
+        channel.disconnect().awaitUninterruptibly();
+    }
+
+    @Override
+    void wakeupCnxn() {
+        synchronized (outgoingQueue) {
+            outgoingQueue.notifyAll();
+        }
+    }
+
+    @Override
+    void enableWrite() {
+
+    }
+
+    @Override
+    void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+            throws EndOfStreamException {
+        if (disconnected) {
+            throw new EndOfStreamException("connection for sessionid 0x"
+                    + Long.toHexString(sessionId)
+                    + " lost, likely server has closed socket");
+
+        }
+
+        // channel may not have been connected yet
+        if (isConnected() && doWrites(pendingQueue)) {
+            updateLastSend();
+        }
+
+        if (sendThread.getZkState().isAlive()) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("WAIT to=" + waitTimeOut + " sessionId:0x"
+                        + Long.toHexString(sessionId));
+            }
+            try {
+                synchronized (outgoingQueue) {
+                    outgoingQueue.wait(waitTimeOut);
+                }
+            } catch (InterruptedException e) {
+                LOG.trace("WOKE via interrupt sessionId:0x"
+                        + Long.toHexString(sessionId));
+            } finally {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("WOKE sessionId:0x" + Long.toHexString(sessionId));
+                }
+            }
+
+        }
+        // Everything below and until we get back to the wait is
+        // non blocking, so time is effectively a constant. That is
+        // Why we just have to do this once, here
+        updateNow();
+    }
+
+    private class ZKClientHandler extends SimpleChannelHandler {
+
+        @Override
+        public void channelDisconnected(ChannelHandlerContext ctx,
+                ChannelStateEvent e) throws Exception {
+            disconnected = true;
+            wakeupCnxn();
+        }
+
+        @Override
+        public void channelConnected(ChannelHandlerContext ctx,
+                ChannelStateEvent e) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Channel connected " + e);
+            }
+            channel = ctx.getChannel();
+
+            long now = System.currentTimeMillis();
+
+            lastHeard = now;
+            lastSend = now;
+
+            sendThread.primeConnection();
+
+            initialized = false;
+
+            /*
+             * Reset incomingBuffer
+             */
+            lenBuffer.clear();
+            incomingBuffer = lenBuffer;
+
+            wakeupCnxn();
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+                throws IOException {
+            lastHeard = System.currentTimeMillis();
+
+            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+            while (buf.readable()) {
+                if (incomingBuffer.remaining() > buf.readableBytes()) {
+                    int newLimit = incomingBuffer.position()
+                            + buf.readableBytes();
+                    incomingBuffer.limit(newLimit);
+                }
+                buf.readBytes(incomingBuffer);
+                incomingBuffer.limit(incomingBuffer.capacity());
+
+                if (!incomingBuffer.hasRemaining()) {
+                    incomingBuffer.flip();
+                    if (incomingBuffer == lenBuffer) {
+                        recvCount++;
+                        readLength();
+                    } else if (!initialized) {
+                        readConnectResult();
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+                        initialized = true;
+                    } else {
+                        synchronized (outgoingQueue) {
+                            sendThread.readResponse(incomingBuffer);                            
+                        }
+
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void channelInterestChanged(ChannelHandlerContext ctx,
+                ChannelStateEvent e) {
+            if (e.getState() == ChannelState.INTEREST_OPS) {
+                // handle the case where OP_WRITE changes
+                wakeupCnxn();
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+            LOG.warn("Exception caught " + e, e.getCause());
+        }
+    }
+}

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeper.java Thu Oct 14 18:45:47 2010
@@ -105,6 +105,7 @@ import org.apache.zookeeper.server.DataT
  */
 public class ZooKeeper {
     private static final Logger LOG;
+    public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
 
     static {
         LOG = Logger.getLogger(ZooKeeper.class);
@@ -154,6 +155,7 @@ public class ZooKeeper {
         /* (non-Javadoc)
          * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
          */
+        @Override
         public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                         Watcher.Event.EventType type,
                                         String clientPath)
@@ -322,8 +324,6 @@ public class ZooKeeper {
         }
     }
 
-    volatile States state;
-
     protected final ClientCnxn cnxn;
 
     /**
@@ -376,7 +376,8 @@ public class ZooKeeper {
                 + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
 
         watchManager.defaultWatcher = watcher;
-        cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager);
+        cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+                watchManager, getClientCnxnSocket());
         cnxn.start();
     }
 
@@ -443,8 +444,8 @@ public class ZooKeeper {
                 + (sessionPasswd == null ? "<null>" : "<hidden>"));
 
         watchManager.defaultWatcher = watcher;
-        cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager,
-                sessionId, sessionPasswd);
+        cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+                watchManager, getClientCnxnSocket(), sessionId, sessionPasswd);
         cnxn.start();
     }
 
@@ -518,7 +519,7 @@ public class ZooKeeper {
      * @throws InterruptedException
      */
     public synchronized void close() throws InterruptedException {
-        if (!state.isAlive()) {
+        if (!cnxn.getState().isAlive()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Close called on already closed client");
             }
@@ -1557,7 +1558,7 @@ public class ZooKeeper {
     }
 
     public States getState() {
-        return state;
+        return cnxn.getState();
     }
 
     /**
@@ -1617,7 +1618,7 @@ public class ZooKeeper {
      *         not connected
      */
     protected SocketAddress testableRemoteSocketAddress() {
-        return cnxn.getRemoteSocketAddress();
+        return cnxn.sendThread.getSocket().getRemoteSocketAddress();
     }
 
     /** 
@@ -1630,6 +1631,23 @@ public class ZooKeeper {
      *         not connected
      */
     protected SocketAddress testableLocalSocketAddress() {
-        return cnxn.getLocalSocketAddress();
+        return cnxn.sendThread.getSocket().getLocalSocketAddress();
+    }
+
+    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
+        String clientCnxnSocketName = System
+                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        if (clientCnxnSocketName == null) {
+            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
+        }
+        try {
+            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
+                    .newInstance();
+        } catch (Exception e) {
+            IOException ioe = new IOException("Couldn't instantiate "
+                    + clientCnxnSocketName);
+            ioe.initCause(e);
+            throw ioe;
+        }
     }
 }

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Thu Oct 14 18:45:47 2010
@@ -653,7 +653,7 @@ public class ZooKeeperMain {
         } 
         
         // Below commands all need a live connection
-        if (zk == null || !zk.state.isAlive()) {
+        if (zk == null || !zk.getState().isAlive()) {
             System.out.println("Not connected");
             return false;
         }

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java Thu Oct 14 18:45:47 2010
@@ -24,7 +24,6 @@ import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.io.Writer;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Thu Oct 14 18:45:47 2010
@@ -34,7 +34,6 @@ import org.jboss.netty.buffer.ChannelBuf
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
@@ -61,7 +60,6 @@ public class NettyServerCnxnFactory exte
      * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
      * this class gets access to the member variables and methods.
      */
-    @ChannelPipelineCoverage("all")
     class CnxnChannelHandler extends SimpleChannelHandler {
 
         @Override

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/config/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/config/findbugsExcludeFile.xml?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/config/findbugsExcludeFile.xml (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/config/findbugsExcludeFile.xml Thu Oct 14 18:45:47 2010
@@ -27,7 +27,12 @@
   <!-- We want to catch all exceptions and cleanup, regardless of source
        (incl runtime) -->
   <Match>
-    <Class name="org.apache.zookeeper.ClientCnxn$SendThread" />
+    <Class name="org.apache.zookeeper.NIOClientCnxn$NIOSendThread" />
+    <Method name="run" />
+    <Bug pattern="REC_CATCH_EXCEPTION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.zookeeper.NettyClientCnxn$NettySendThread" />
     <Method name="run" />
     <Bug pattern="REC_CATCH_EXCEPTION" />
   </Match>
@@ -79,6 +84,14 @@
     <Class name="org.apache.zookeeper.ClientCnxn"/>
       <Bug code="EI, EI2" />
   </Match>
+  <Match>
+    <Class name="org.apache.zookeeper.NIOClientCnxn"/>
+      <Bug code="EI, EI2" />
+  </Match>
+  <Match>
+    <Class name="org.apache.zookeeper.NettyClientCnxn"/>
+      <Bug code="EI, EI2" />
+  </Match>
 
   <Match>
     <Class name="org.apache.zookeeper.server.DataNode"/>

Modified: hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/TestableZooKeeper.java?rev=1022642&r1=1022641&r2=1022642&view=diff
==============================================================================
--- hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/TestableZooKeeper.java (original)
+++ hadoop/zookeeper/branches/ZOOKEEPER-823/src/java/test/org/apache/zookeeper/TestableZooKeeper.java Thu Oct 14 18:45:47 2010
@@ -59,7 +59,7 @@ public class TestableZooKeeper extends Z
                 synchronized(cnxn) {
                     try {
                         try {
-                            ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+                            cnxn.sendThread.testableCloseSocket();
                         } catch (IOException e) {
                             e.printStackTrace();
                         }



Mime
View raw message