zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [22/51] [partial] zookeeper git commit: ZOOKEEPER-3032: MAVEN MIGRATION - zookeeper-server
Date Fri, 19 Oct 2018 12:40:20 GMT
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
new file mode 100644
index 0000000..f17a819
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientCnxnSocketNIO extends ClientCnxnSocket {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(ClientCnxnSocketNIO.class);
+
+    private final Selector selector = Selector.open();
+
+    private SelectionKey sockKey;
+
+    private SocketAddress localSocketAddress;
+
+    private SocketAddress remoteSocketAddress;
+
+    ClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException {
+        this.clientConfig = clientConfig;
+        initProperties();
+    }
+
+    @Override
+    boolean isConnected() {
+        return sockKey != null;
+    }
+    
+    /**
+     * @return true if a packet was received
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
+      throws InterruptedException, IOException {
+        SocketChannel sock = (SocketChannel) sockKey.channel();
+        if (sock == null) {
+            throw new IOException("Socket is null!");
+        }
+        if (sockKey.isReadable()) {
+            int rc = sock.read(incomingBuffer);
+            if (rc < 0) {
+                throw new EndOfStreamException(
+                        "Unable to read additional data from server sessionid 0x"
+                                + Long.toHexString(sessionId)
+                                + ", likely server has closed socket");
+            }
+            if (!incomingBuffer.hasRemaining()) {
+                incomingBuffer.flip();
+                if (incomingBuffer == lenBuffer) {
+                    recvCount++;
+                    readLength();
+                } else if (!initialized) {
+                    readConnectResult();
+                    enableRead();
+                    if (findSendablePacket(outgoingQueue,
+                            sendThread.tunnelAuthInProgress()) != null) {
+                        // Since SASL authentication has completed (if client is configured to do so),
+                        // outgoing packets waiting in the outgoingQueue can now be sent.
+                        enableWrite();
+                    }
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+                    updateLastHeard();
+                    initialized = true;
+                } else {
+                    sendThread.readResponse(incomingBuffer);
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+                    updateLastHeard();
+                }
+            }
+        }
+        if (sockKey.isWritable()) {
+            Packet p = findSendablePacket(outgoingQueue,
+                    sendThread.tunnelAuthInProgress());
+
+            if (p != null) {
+                updateLastSend();
+                // If we already started writing p, p.bb will already exist
+                if (p.bb == null) {
+                    if ((p.requestHeader != null) &&
+                            (p.requestHeader.getType() != OpCode.ping) &&
+                            (p.requestHeader.getType() != OpCode.auth)) {
+                        p.requestHeader.setXid(cnxn.getXid());
+                    }
+                    p.createBB();
+                }
+                sock.write(p.bb);
+                if (!p.bb.hasRemaining()) {
+                    sentCount++;
+                    outgoingQueue.removeFirstOccurrence(p);
+                    if (p.requestHeader != null
+                            && p.requestHeader.getType() != OpCode.ping
+                            && p.requestHeader.getType() != OpCode.auth) {
+                        synchronized (pendingQueue) {
+                            pendingQueue.add(p);
+                        }
+                    }
+                }
+            }
+            if (outgoingQueue.isEmpty()) {
+                // No more packets to send: turn off write interest flag.
+                // Will be turned on later by a later call to enableWrite(),
+                // from within ZooKeeperSaslClient (if client is configured
+                // to attempt SASL authentication), or in either doIO() or
+                // in doTransport() if not.
+                disableWrite();
+            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
+                // On initial connection, write the complete connect request
+                // packet, but then disable further writes until after
+                // receiving a successful connection response.  If the
+                // session is expired, then the server sends the expiration
+                // response and immediately closes its end of the socket.  If
+                // the client is simultaneously writing on its end, then the
+                // TCP stack may choose to abort with RST, in which case the
+                // client would never receive the session expired event.  See
+                // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
+                disableWrite();
+            } else {
+                // Just in case
+                enableWrite();
+            }
+        }
+    }
+
+    private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue,
+                                      boolean tunneledAuthInProgres) {
+        if (outgoingQueue.isEmpty()) {
+            return null;
+        }
+        // If we've already starting sending the first packet, we better finish
+        if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) {
+            return outgoingQueue.getFirst();
+        }
+        // Since client's authentication with server is in progress,
+        // send only the null-header packet queued by primeConnection().
+        // This packet must be sent so that the SASL authentication process
+        // can proceed, but all other packets should wait until
+        // SASL authentication completes.
+        Iterator<Packet> iter = outgoingQueue.iterator();
+        while (iter.hasNext()) {
+            Packet p = iter.next();
+            if (p.requestHeader == null) {
+                // We've found the priming-packet. Move it to the beginning of the queue.
+                iter.remove();
+                outgoingQueue.addFirst(p);
+                return p;
+            } else {
+                // Non-priming packet: defer it until later, leaving it in the queue
+                // until authentication completes.
+                LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    void cleanup() {
+        if (sockKey != null) {
+            SocketChannel sock = (SocketChannel) sockKey.channel();
+            sockKey.cancel();
+            try {
+                sock.socket().shutdownInput();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during shutdown input", e);
+                }
+            }
+            try {
+                sock.socket().shutdownOutput();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during shutdown output",
+                            e);
+                }
+            }
+            try {
+                sock.socket().close();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during socket close", e);
+                }
+            }
+            try {
+                sock.close();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during channel close", e);
+                }
+            }
+        }
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("SendThread interrupted during sleep, ignoring");
+            }
+        }
+        sockKey = null;
+    }
+ 
+    @Override
+    void close() {
+        try {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Doing client selector close");
+            }
+            selector.close();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Closed client selector");
+            }
+        } catch (IOException e) {
+            LOG.warn("Ignoring exception during selector close", e);
+        }
+    }
+    
+    /**
+     * create a socket channel.
+     * @return the created socket channel
+     * @throws IOException
+     */
+    SocketChannel createSock() throws IOException {
+        SocketChannel sock;
+        sock = SocketChannel.open();
+        sock.configureBlocking(false);
+        sock.socket().setSoLinger(false, -1);
+        sock.socket().setTcpNoDelay(true);
+        return sock;
+    }
+
+    /**
+     * register with the selection and connect
+     * @param sock the {@link SocketChannel} 
+     * @param addr the address of remote host
+     * @throws IOException
+     */
+    void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
+    throws IOException {
+        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
+        boolean immediateConnect = sock.connect(addr);
+        if (immediateConnect) {
+            sendThread.primeConnection();
+        }
+    }
+    
+    @Override
+    void connect(InetSocketAddress addr) throws IOException {
+        SocketChannel sock = createSock();
+        try {
+           registerAndConnect(sock, addr);
+      } catch (IOException e) {
+            LOG.error("Unable to open socket to " + addr);
+            sock.close();
+            throw e;
+        }
+        initialized = false;
+
+        /*
+         * Reset incomingBuffer
+         */
+        lenBuffer.clear();
+        incomingBuffer = lenBuffer;
+    }
+
+    /**
+     * Returns the address to which the socket is connected.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getRemoteSocketAddress() {
+        return remoteSocketAddress;
+    }
+
+    /**
+     * Returns the local address to which the socket is bound.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getLocalSocketAddress() {
+        return localSocketAddress;
+    }
+    
+    private void updateSocketAddresses() {
+        Socket socket = ((SocketChannel) sockKey.channel()).socket();
+        localSocketAddress = socket.getLocalSocketAddress();
+        remoteSocketAddress = socket.getRemoteSocketAddress();
+    }
+
+    @Override
+    void packetAdded() {
+        wakeupCnxn();
+    }
+
+    @Override
+    void onClosing() {
+        wakeupCnxn();
+    }
+
+    private synchronized void wakeupCnxn() {
+        selector.wakeup();
+    }
+    
+    @Override
+    void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
+            throws IOException, InterruptedException {
+        selector.select(waitTimeOut);
+        Set<SelectionKey> selected;
+        synchronized (this) {
+            selected = selector.selectedKeys();
+        }
+        // Everything below and until we get back to the select is
+        // non blocking, so time is effectively a constant. That is
+        // Why we just have to do this once, here
+        updateNow();
+        for (SelectionKey k : selected) {
+            SocketChannel sc = ((SocketChannel) k.channel());
+            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
+                if (sc.finishConnect()) {
+                    updateLastSendAndHeard();
+                    updateSocketAddresses();
+                    sendThread.primeConnection();
+                }
+            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+                doIO(pendingQueue, cnxn);
+            }
+        }
+        if (sendThread.getZkState().isConnected()) {
+            if (findSendablePacket(outgoingQueue,
+                    sendThread.tunnelAuthInProgress()) != null) {
+                enableWrite();
+            }
+        }
+        selected.clear();
+    }
+
+    //TODO should this be synchronized?
+    @Override
+    void testableCloseSocket() throws IOException {
+        LOG.info("testableCloseSocket() called");
+        // sockKey may be concurrently accessed by multiple
+        // threads. We use tmp here to avoid a race condition
+        SelectionKey tmp = sockKey;
+        if (tmp!=null) {
+           ((SocketChannel) tmp.channel()).socket().close();
+        }
+    }
+
+    @Override
+    void saslCompleted() {
+        enableWrite();
+    }
+
+    synchronized void enableWrite() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_WRITE) == 0) {
+            sockKey.interestOps(i | SelectionKey.OP_WRITE);
+        }
+    }
+
+    private synchronized void disableWrite() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_WRITE) != 0) {
+            sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
+        }
+    }
+
+    synchronized private void enableRead() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_READ) == 0) {
+            sockKey.interestOps(i | SelectionKey.OP_READ);
+        }
+    }
+
+    @Override
+    void connectionPrimed() {
+        sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+    }
+
+    Selector getSelector() {
+        return selector;
+    }
+
+    @Override
+    void sendPacket(Packet p) throws IOException {
+        SocketChannel sock = (SocketChannel) sockKey.channel();
+        if (sock == null) {
+            throw new IOException("Socket is null!");
+        }
+        p.createBB();
+        ByteBuffer pbb = p.bb;
+        sock.write(pbb);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
new file mode 100755
index 0000000..34c3db3
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.zookeeper.common.X509Exception.SSLContextException;
+
+/**
+ * ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
+ * It's responsible for connecting to server, reading/writing network traffic and
+ * being a layer between network data and higher level packets.
+ */
+public class ClientCnxnSocketNetty extends ClientCnxnSocket {
+    private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
+
+    ChannelFactory channelFactory = new NioClientSocketChannelFactory(
+            Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+    Channel channel;
+    CountDownLatch firstConnect;
+    ChannelFuture connectFuture;
+    Lock connectLock = new ReentrantLock();
+    AtomicBoolean disconnected = new AtomicBoolean();
+    AtomicBoolean needSasl = new AtomicBoolean();
+    Semaphore waitSasl = new Semaphore(0);
+
+    ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
+        this.clientConfig = clientConfig;
+        initProperties();
+    }
+
+    /**
+     * lifecycles diagram:
+     * <p/>
+     * loop:
+     * - try:
+     * - - !isConnected()
+     * - - - connect()
+     * - - doTransport()
+     * - catch:
+     * - - cleanup()
+     * close()
+     * <p/>
+     * Other non-lifecycle methods are in jeopardy getting a null channel
+     * when calling in concurrency. We must handle it.
+     */
+
+    @Override
+    boolean isConnected() {
+        // Assuming that isConnected() is only used to initiate connection,
+        // not used by some other connection status judgement.
+        return channel != null;
+    }
+
+    @Override
+    void connect(InetSocketAddress addr) throws IOException {
+        firstConnect = new CountDownLatch(1);
+
+        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+
+        bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
+        bootstrap.setOption("soLinger", -1);
+        bootstrap.setOption("tcpNoDelay", true);
+
+        connectFuture = bootstrap.connect(addr);
+        connectFuture.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                // this lock guarantees that channel won't be assgined after cleanup().
+                connectLock.lock();
+                try {
+                    if (!channelFuture.isSuccess() || connectFuture == null) {
+                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
+                        return;
+                    }
+                    // setup channel, variables, connection, etc.
+                    channel = channelFuture.getChannel();
+
+                    disconnected.set(false);
+                    initialized = false;
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+
+                    sendThread.primeConnection();
+                    updateNow();
+                    updateLastSendAndHeard();
+
+                    if (sendThread.tunnelAuthInProgress()) {
+                        waitSasl.drainPermits();
+                        needSasl.set(true);
+                        sendPrimePacket();
+                    } else {
+                        needSasl.set(false);
+                    }
+
+                    // we need to wake up on first connect to avoid timeout.
+                    wakeupCnxn();
+                    firstConnect.countDown();
+                    LOG.info("channel is connected: {}", channelFuture.getChannel());
+                } finally {
+                    connectLock.unlock();
+                }
+            }
+        });
+    }
+
+    @Override
+    void cleanup() {
+        connectLock.lock();
+        try {
+            if (connectFuture != null) {
+                connectFuture.cancel();
+                connectFuture = null;
+            }
+            if (channel != null) {
+                channel.close().awaitUninterruptibly();
+                channel = null;
+            }
+        } finally {
+            connectLock.unlock();
+        }
+        Iterator<Packet> iter = outgoingQueue.iterator();
+        while (iter.hasNext()) {
+            Packet p = iter.next();
+            if (p == WakeupPacket.getInstance()) {
+                iter.remove();
+            }
+        }
+    }
+
+    @Override
+    void close() {
+        channelFactory.releaseExternalResources();
+    }
+
+    @Override
+    void saslCompleted() {
+        needSasl.set(false);
+        waitSasl.release();
+    }
+
+    @Override
+    void connectionPrimed() {
+    }
+
+    @Override
+    void packetAdded() {
+    }
+
+    @Override
+    void onClosing() {
+        firstConnect.countDown();
+        wakeupCnxn();
+        LOG.info("channel is told closing");
+    }
+
+    private void wakeupCnxn() {
+        if (needSasl.get()) {
+            waitSasl.release();
+        }
+        outgoingQueue.add(WakeupPacket.getInstance());
+    }
+
+    @Override
+    void doTransport(int waitTimeOut,
+                     List<Packet> pendingQueue,
+                     ClientCnxn cnxn)
+            throws IOException, InterruptedException {
+        try {
+            if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
+                return;
+            }
+            Packet head = null;
+            if (needSasl.get()) {
+                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
+                    return;
+                }
+            } else {
+                if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
+                    return;
+                }
+            }
+            // check if being waken up on closing.
+            if (!sendThread.getZkState().isAlive()) {
+                // adding back the patck to notify of failure in conLossPacket().
+                addBack(head);
+                return;
+            }
+            // channel disconnection happened
+            if (disconnected.get()) {
+                addBack(head);
+                throw new EndOfStreamException("channel for sessionid 0x"
+                        + Long.toHexString(sessionId)
+                        + " is lost");
+            }
+            if (head != null) {
+                doWrite(pendingQueue, head, cnxn);
+            }
+        } finally {
+            updateNow();
+        }
+    }
+
+    private void addBack(Packet head) {
+        if (head != null && head != WakeupPacket.getInstance()) {
+            outgoingQueue.addFirst(head);
+        }
+    }
+
+    private void sendPkt(Packet p) {
+        // Assuming the packet will be sent out successfully. Because if it fails,
+        // the channel will close and clean up queues.
+        p.createBB();
+        updateLastSend();
+        sentCount++;
+        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+    }
+
+    private void sendPrimePacket() {
+        // assuming the first packet is the priming packet.
+        sendPkt(outgoingQueue.remove());
+    }
+
+    /**
+     * doWrite handles writing the packets from outgoingQueue via network to server.
+     */
+    private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
+        updateNow();
+        while (true) {
+            if (p != WakeupPacket.getInstance()) {
+                if ((p.requestHeader != null) &&
+                        (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
+                        (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
+                    p.requestHeader.setXid(cnxn.getXid());
+                    synchronized (pendingQueue) {
+                        pendingQueue.add(p);
+                    }
+                }
+                sendPkt(p);
+            }
+            if (outgoingQueue.isEmpty()) {
+                break;
+            }
+            p = outgoingQueue.remove();
+        }
+    }
+
+    @Override
+    void sendPacket(ClientCnxn.Packet p) throws IOException {
+        if (channel == null) {
+            throw new IOException("channel has been closed");
+        }
+        sendPkt(p);
+    }
+
+    @Override
+    SocketAddress getRemoteSocketAddress() {
+        Channel copiedChanRef = channel;
+        return (copiedChanRef == null) ? null : copiedChanRef.getRemoteAddress();
+    }
+
+    @Override
+    SocketAddress getLocalSocketAddress() {
+        Channel copiedChanRef = channel;
+        return (copiedChanRef == null) ? null : copiedChanRef.getLocalAddress();
+    }
+
+    @Override
+    void testableCloseSocket() throws IOException {
+        Channel copiedChanRef = channel;
+        if (copiedChanRef != null) {
+            copiedChanRef.disconnect().awaitUninterruptibly();
+        }
+    }
+
+
+    // *************** <END> CientCnxnSocketNetty </END> ******************
+    private static class WakeupPacket {
+        private static final Packet instance = new Packet(null, null, null, null, null);
+
+        protected WakeupPacket() {
+            // Exists only to defeat instantiation.
+        }
+
+        public static Packet getInstance() {
+            return instance;
+        }
+    }
+
+    /**
+     * ZKClientPipelineFactory is the netty pipeline factory for this netty
+     * connection implementation.
+     */
+    private class ZKClientPipelineFactory implements ChannelPipelineFactory {
+        private SSLContext sslContext = null;
+        private SSLEngine sslEngine = null;
+        private String host;
+        private int port;
+
+        public ZKClientPipelineFactory(String host, int port) {
+            this.host = host;
+            this.port = port;
+        }
+
+        @Override
+        public ChannelPipeline getPipeline() throws Exception {
+            ChannelPipeline pipeline = Channels.pipeline();
+            if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) {
+                initSSL(pipeline);
+            }
+            pipeline.addLast("handler", new ZKClientHandler());
+            return pipeline;
+        }
+
+        // The synchronized is to prevent the race on shared variable "sslEngine".
+        // Basically we only need to create it once.
+        private synchronized void initSSL(ChannelPipeline pipeline) throws SSLContextException {
+            if (sslContext == null || sslEngine == null) {
+                sslContext = new ClientX509Util().createSSLContext(clientConfig);
+                sslEngine = sslContext.createSSLEngine(host,port);
+                sslEngine.setUseClientMode(true);
+            }
+            pipeline.addLast("ssl", new SslHandler(sslEngine));
+            LOG.info("SSL handler added for channel: {}", pipeline.getChannel());
+        }
+    }
+
+    /**
+     * ZKClientHandler is the netty handler that sits in netty upstream last
+     * place. It mainly handles read traffic and helps synchronize connection state.
+     */
+    private class ZKClientHandler extends SimpleChannelUpstreamHandler {
+        AtomicBoolean channelClosed = new AtomicBoolean(false);
+
+        @Override
+        public void channelDisconnected(ChannelHandlerContext ctx,
+                                        ChannelStateEvent e) throws Exception {
+            LOG.info("channel is disconnected: {}", ctx.getChannel());
+            cleanup();
+        }
+
+        /**
+         * netty handler has encountered problems. We are cleaning it up and tell outside to close
+         * the channel/connection.
+         */
+        private void cleanup() {
+            if (!channelClosed.compareAndSet(false, true)) {
+                return;
+            }
+            disconnected.set(true);
+            onClosing();
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx,
+                                    MessageEvent e) throws Exception {
+            updateNow();
+            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+            while (buf.readable()) {
+                if (incomingBuffer.remaining() > buf.readableBytes()) {
+                    int newLimit = incomingBuffer.position()
+                            + buf.readableBytes();
+                    incomingBuffer.limit(newLimit);
+                }
+                buf.readBytes(incomingBuffer);
+                incomingBuffer.limit(incomingBuffer.capacity());
+
+                if (!incomingBuffer.hasRemaining()) {
+                    incomingBuffer.flip();
+                    if (incomingBuffer == lenBuffer) {
+                        recvCount++;
+                        readLength();
+                    } else if (!initialized) {
+                        readConnectResult();
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+                        initialized = true;
+                        updateLastHeard();
+                    } else {
+                        sendThread.readResponse(incomingBuffer);
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+                        updateLastHeard();
+                    }
+                }
+            }
+            wakeupCnxn();
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx,
+                                    ExceptionEvent e) throws Exception {
+            LOG.warn("Exception caught: {}", e, e.getCause());
+            cleanup();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientWatchManager.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientWatchManager.java
new file mode 100644
index 0000000..d56374d
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientWatchManager.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.util.Set;
+
+/**
+ */
+public interface ClientWatchManager {
+    /**
+     * Return a set of watchers that should be notified of the event. The 
+     * manager must not notify the watcher(s), however it will update it's 
+     * internal structure as if the watches had triggered. The intent being 
+     * that the callee is now responsible for notifying the watchers of the 
+     * event, possibly at some later time.
+     * 
+     * @param state event state
+     * @param type event type
+     * @param path event path
+     * @return may be empty set but must not be null
+     */
+    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
+        Watcher.Event.EventType type, String path);
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java b/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java
new file mode 100644
index 0000000..587f7a1
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/***
+ *  CreateMode value determines how the znode is created on ZooKeeper.
+ */
+@InterfaceAudience.Public
+public enum CreateMode {
+    
+    /**
+     * The znode will not be automatically deleted upon client's disconnect.
+     */
+    PERSISTENT (0, false, false, false, false),
+    /**
+    * The znode will not be automatically deleted upon client's disconnect,
+    * and its name will be appended with a monotonically increasing number.
+    */
+    PERSISTENT_SEQUENTIAL (2, false, true, false, false),
+    /**
+     * The znode will be deleted upon the client's disconnect.
+     */
+    EPHEMERAL (1, true, false, false, false),
+    /**
+     * The znode will be deleted upon the client's disconnect, and its name
+     * will be appended with a monotonically increasing number.
+     */
+    EPHEMERAL_SEQUENTIAL (3, true, true, false, false),
+    /**
+     * The znode will be a container node. Container
+     * nodes are special purpose nodes useful for recipes such as leader, lock,
+     * etc. When the last child of a container is deleted, the container becomes
+     * a candidate to be deleted by the server at some point in the future.
+     * Given this property, you should be prepared to get
+     * {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * when creating children inside of this container node.
+     */
+    CONTAINER (4, false, false, true, false),
+    /**
+     * The znode will not be automatically deleted upon client's disconnect.
+     * However if the znode has not been modified within the given TTL, it
+     * will be deleted once it has no children.
+     */
+    PERSISTENT_WITH_TTL(5, false, false, false, true),
+    /**
+     * The znode will not be automatically deleted upon client's disconnect,
+     * and its name will be appended with a monotonically increasing number.
+     * However if the znode has not been modified within the given TTL, it
+     * will be deleted once it has no children.
+     */
+    PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true);
+
+    private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);
+
+    private boolean ephemeral;
+    private boolean sequential;
+    private final boolean isContainer;
+    private int flag;
+    private boolean isTTL;
+
+    CreateMode(int flag, boolean ephemeral, boolean sequential,
+               boolean isContainer, boolean isTTL) {
+        this.flag = flag;
+        this.ephemeral = ephemeral;
+        this.sequential = sequential;
+        this.isContainer = isContainer;
+        this.isTTL = isTTL;
+    }
+
+    public boolean isEphemeral() { 
+        return ephemeral;
+    }
+
+    public boolean isSequential() { 
+        return sequential;
+    }
+
+    public boolean isContainer() {
+        return isContainer;
+    }
+
+    public boolean isTTL() {
+        return isTTL;
+    }
+
+    public int toFlag() {
+        return flag;
+    }
+
+    /**
+     * Map an integer value to a CreateMode value
+     */
+    static public CreateMode fromFlag(int flag) throws KeeperException {
+        switch(flag) {
+        case 0: return CreateMode.PERSISTENT;
+
+        case 1: return CreateMode.EPHEMERAL;
+
+        case 2: return CreateMode.PERSISTENT_SEQUENTIAL;
+
+        case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ;
+
+        case 4: return CreateMode.CONTAINER;
+
+        case 5: return CreateMode.PERSISTENT_WITH_TTL;
+
+        case 6: return CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL;
+
+        default:
+            String errMsg = "Received an invalid flag value: " + flag
+                    + " to convert to a CreateMode";
+            LOG.error(errMsg);
+            throw new KeeperException.BadArgumentsException(errMsg);
+        }
+    }
+
+    /**
+     * Map an integer value to a CreateMode value
+     */
+    static public CreateMode fromFlag(int flag, CreateMode defaultMode) {
+        switch(flag) {
+            case 0:
+                return CreateMode.PERSISTENT;
+
+            case 1:
+                return CreateMode.EPHEMERAL;
+
+            case 2:
+                return CreateMode.PERSISTENT_SEQUENTIAL;
+
+            case 3:
+                return CreateMode.EPHEMERAL_SEQUENTIAL;
+
+            case 4:
+                return CreateMode.CONTAINER;
+
+            case 5:
+                return CreateMode.PERSISTENT_WITH_TTL;
+
+            case 6:
+                return CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL;
+
+            default:
+                return defaultMode;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/Environment.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Environment.java b/zookeeper-server/src/main/java/org/apache/zookeeper/Environment.java
new file mode 100644
index 0000000..7bd5c7f
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Environment.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+
+/**
+ * Provide insight into the runtime environment.
+ *
+ */
+public class Environment {
+    public static final String JAAS_CONF_KEY = "java.security.auth.login.config";
+
+    public static class Entry {
+        private String k;
+        private String v;
+        public Entry(String k, String v) {
+            this.k = k;
+            this.v = v;
+        }
+        public String getKey() { return k; }
+        public String getValue() { return v; }
+        
+        @Override
+        public String toString() {
+            return k + "=" + v;
+        }
+    }
+
+    private static void put(List<Entry> l, String k, String v) {
+        l.add(new Entry(k,v));
+    }
+
+    public static List<Entry> list() {
+        List<Entry> l = new ArrayList<Entry>();
+        put(l, "zookeeper.version", Version.getFullVersion());
+
+        try {
+            put(l, "host.name",
+                InetAddress.getLocalHost().getCanonicalHostName());
+        } catch (UnknownHostException e) {
+            put(l, "host.name", "<NA>");
+        }
+
+        put(l, "java.version",
+                System.getProperty("java.version", "<NA>"));
+        put(l, "java.vendor",
+                System.getProperty("java.vendor", "<NA>"));
+        put(l, "java.home",
+                System.getProperty("java.home", "<NA>"));
+        put(l, "java.class.path",
+                System.getProperty("java.class.path", "<NA>"));
+        put(l, "java.library.path",
+                System.getProperty("java.library.path", "<NA>"));
+        put(l, "java.io.tmpdir",
+                System.getProperty("java.io.tmpdir", "<NA>"));
+        put(l, "java.compiler",
+                System.getProperty("java.compiler", "<NA>"));
+        put(l, "os.name",
+                System.getProperty("os.name", "<NA>"));
+        put(l, "os.arch",
+                System.getProperty("os.arch", "<NA>"));
+        put(l, "os.version",
+                System.getProperty("os.version", "<NA>"));
+        put(l, "user.name",
+                System.getProperty("user.name", "<NA>"));
+        put(l, "user.home",
+                System.getProperty("user.home", "<NA>"));
+        put(l, "user.dir",
+                System.getProperty("user.dir", "<NA>"));
+
+        // Get memory information.
+        Runtime runtime = Runtime.getRuntime();
+        int mb = 1024 * 1024;
+        put(l, "os.memory.free",
+               Long.toString(runtime.freeMemory() / mb) + "MB");
+        put(l, "os.memory.max",
+               Long.toString(runtime.maxMemory() / mb) + "MB");
+        put(l, "os.memory.total",
+               Long.toString(runtime.totalMemory() / mb) + "MB");
+
+        return l;
+    }
+    
+    public static void logEnv(String msg, Logger log) {
+        List<Entry> env = Environment.list();
+        for (Entry e : env) {
+            log.info(msg + e.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/JLineZNodeCompleter.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/JLineZNodeCompleter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/JLineZNodeCompleter.java
new file mode 100644
index 0000000..fa6126b
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/JLineZNodeCompleter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.util.Collections;
+import java.util.List;
+
+import jline.console.completer.Completer;
+
+class JLineZNodeCompleter implements Completer {
+    private ZooKeeper zk;
+
+    public JLineZNodeCompleter(ZooKeeper zk) {
+        this.zk = zk;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public int complete(String buffer, int cursor, List candidates) {
+        // Guarantee that the final token is the one we're expanding
+        buffer = buffer.substring(0,cursor);
+        String token = "";
+        if (!buffer.endsWith(" ")) {
+            String[] tokens = buffer.split(" ");
+            if (tokens.length != 0) {
+                token = tokens[tokens.length-1] ;
+            }
+        }
+
+        if (token.startsWith("/")){
+            return completeZNode( buffer, token, candidates);
+        }
+        return completeCommand(buffer, token, candidates);
+    }
+
+    private int completeCommand(String buffer, String token,
+            List<String> candidates)
+    {
+        for (String cmd : ZooKeeperMain.getCommands()) {
+            if (cmd.startsWith( token )) {
+                candidates.add(cmd);
+            }
+        }
+        return buffer.lastIndexOf(" ")+1;
+    }
+
+    private int completeZNode( String buffer, String token,
+            List<String> candidates)
+    {
+        String path = token;
+        int idx = path.lastIndexOf("/") + 1;
+        String prefix = path.substring(idx);
+        try {
+            // Only the root path can end in a /, so strip it off every other prefix
+            String dir = idx == 1 ? "/" : path.substring(0,idx-1);
+            List<String> children = zk.getChildren(dir, false);
+            for (String child : children) {
+                if (child.startsWith(prefix)) {
+                    candidates.add( child );
+                }
+            }
+        } catch( InterruptedException e) {
+            return 0;
+        }
+        catch( KeeperException e) {
+            return 0;
+        }
+        Collections.sort(candidates);
+        return candidates.size() == 0 ? buffer.length() : buffer.lastIndexOf("/") + 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
new file mode 100644
index 0000000..f797bb0
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
@@ -0,0 +1,859 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+public abstract class KeeperException extends Exception {
+    /**
+     * All multi-requests that result in an exception retain the results
+     * here so that it is possible to examine the problems in the catch
+     * scope.  Non-multi requests will get a null if they try to access
+     * these results.
+     */
+    private List<OpResult> results;
+
+    /**
+     * All non-specific keeper exceptions should be constructed via
+     * this factory method in order to guarantee consistency in error
+     * codes and such.  If you know the error code, then you should
+     * construct the special purpose exception directly.  That will
+     * allow you to have the most specific possible declarations of
+     * what exceptions might actually be thrown.
+     *
+     * @param code The error code.
+     * @param path The ZooKeeper path being operated on.
+     * @return The specialized exception, presumably to be thrown by
+     *  the caller.
+     */
+    public static KeeperException create(Code code, String path) {
+        KeeperException r = create(code);
+        r.path = path;
+        return r;
+    }
+
+    /**
+     * @deprecated deprecated in 3.1.0, use {@link #create(Code, String)}
+     * instead
+     */
+    @Deprecated
+    public static KeeperException create(int code, String path) {
+        KeeperException r = create(Code.get(code));
+        r.path = path;
+        return r;
+    }
+
+    /**
+     * @deprecated deprecated in 3.1.0, use {@link #create(Code)}
+     * instead
+     */
+    @Deprecated
+    public static KeeperException create(int code) {
+        return create(Code.get(code));
+    }
+
+    /**
+     * All non-specific keeper exceptions should be constructed via
+     * this factory method in order to guarantee consistency in error
+     * codes and such.  If you know the error code, then you should
+     * construct the special purpose exception directly.  That will
+     * allow you to have the most specific possible declarations of
+     * what exceptions might actually be thrown.
+     *
+     * @param code The error code of your new exception.  This will
+     * also determine the specific type of the exception that is
+     * returned.
+     * @return The specialized exception, presumably to be thrown by
+     * the caller.
+     */
+    public static KeeperException create(Code code) {
+        switch (code) {
+            case SYSTEMERROR:
+                return new SystemErrorException();
+            case RUNTIMEINCONSISTENCY:
+                return new RuntimeInconsistencyException();
+            case DATAINCONSISTENCY:
+                return new DataInconsistencyException();
+            case CONNECTIONLOSS:
+                return new ConnectionLossException();
+            case MARSHALLINGERROR:
+                return new MarshallingErrorException();
+            case UNIMPLEMENTED:
+                return new UnimplementedException();
+            case OPERATIONTIMEOUT:
+                return new OperationTimeoutException();
+            case NEWCONFIGNOQUORUM:
+               return new NewConfigNoQuorum();
+            case RECONFIGINPROGRESS:
+               return new ReconfigInProgress();
+            case BADARGUMENTS:
+                return new BadArgumentsException();
+            case APIERROR:
+                return new APIErrorException();
+            case NONODE:
+                return new NoNodeException();
+            case NOAUTH:
+                return new NoAuthException();
+            case BADVERSION:
+                return new BadVersionException();
+            case NOCHILDRENFOREPHEMERALS:
+                return new NoChildrenForEphemeralsException();
+            case NODEEXISTS:
+                return new NodeExistsException();
+            case INVALIDACL:
+                return new InvalidACLException();
+            case AUTHFAILED:
+                return new AuthFailedException();
+            case NOTEMPTY:
+                return new NotEmptyException();
+            case SESSIONEXPIRED:
+                return new SessionExpiredException();
+            case INVALIDCALLBACK:
+                return new InvalidCallbackException();
+            case SESSIONMOVED:
+                return new SessionMovedException();
+            case NOTREADONLY:
+                return new NotReadOnlyException();
+            case EPHEMERALONLOCALSESSION:
+                return new EphemeralOnLocalSessionException();
+            case NOWATCHER:
+                return new NoWatcherException();
+            case RECONFIGDISABLED:
+                return new ReconfigDisabledException();
+            case REQUESTTIMEOUT:
+                return new RequestTimeoutException();
+            case OK:
+            default:
+                throw new IllegalArgumentException("Invalid exception code");
+        }
+    }
+
+    /**
+     * Set the code for this exception
+     * @param code error code
+     * @deprecated deprecated in 3.1.0, exceptions should be immutable, this
+     * method should not be used
+     */
+    @Deprecated
+    public void setCode(int code) {
+        this.code = Code.get(code);
+    }
+
+    /** This interface contains the original static final int constants
+     * which have now been replaced with an enumeration in Code. Do not
+     * reference this class directly, if necessary (legacy code) continue
+     * to access the constants through Code.
+     * Note: an interface is used here due to the fact that enums cannot
+     * reference constants defined within the same enum as said constants
+     * are considered initialized _after_ the enum itself. By using an
+     * interface as a super type this allows the deprecated constants to
+     * be initialized first and referenced when constructing the enums. I
+     * didn't want to have constants declared twice. This
+     * interface should be private, but it's declared public to enable
+     * javadoc to include in the user API spec.
+     */
+    @Deprecated
+    @InterfaceAudience.Public
+    public interface CodeDeprecated {
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#OK} instead
+         */
+        @Deprecated
+        public static final int Ok = 0;
+
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#SYSTEMERROR} instead
+         */
+        @Deprecated
+        public static final int SystemError = -1;
+        /**
+         * @deprecated deprecated in 3.1.0, use
+         * {@link Code#RUNTIMEINCONSISTENCY} instead
+         */
+        @Deprecated
+        public static final int RuntimeInconsistency = -2;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#DATAINCONSISTENCY}
+         * instead
+         */
+        @Deprecated
+        public static final int DataInconsistency = -3;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#CONNECTIONLOSS}
+         * instead
+         */
+        @Deprecated
+        public static final int ConnectionLoss = -4;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#MARSHALLINGERROR}
+         * instead
+         */
+        @Deprecated
+        public static final int MarshallingError = -5;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#UNIMPLEMENTED}
+         * instead
+         */
+        @Deprecated
+        public static final int Unimplemented = -6;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#OPERATIONTIMEOUT}
+         * instead
+         */
+        @Deprecated
+        public static final int OperationTimeout = -7;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#BADARGUMENTS}
+         * instead
+         */
+        @Deprecated
+        public static final int BadArguments = -8;
+
+        @Deprecated
+        public static final int UnknownSession= -12;
+
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#NEWCONFIGNOQUORUM}
+         * instead
+         */
+        @Deprecated
+        public static final int NewConfigNoQuorum = -13;
+
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#RECONFIGINPROGRESS}
+         * instead
+         */
+        @Deprecated
+        public static final int ReconfigInProgress= -14;
+
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#APIERROR} instead
+         */
+        @Deprecated
+        public static final int APIError = -100;
+
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#NONODE} instead
+         */
+        @Deprecated
+        public static final int NoNode = -101;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#NOAUTH} instead
+         */
+        @Deprecated
+        public static final int NoAuth = -102;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#BADVERSION} instead
+         */
+        @Deprecated
+        public static final int BadVersion = -103;
+        /**
+         * @deprecated deprecated in 3.1.0, use
+         * {@link Code#NOCHILDRENFOREPHEMERALS}
+         * instead
+         */
+        @Deprecated
+        public static final int NoChildrenForEphemerals = -108;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#NODEEXISTS} instead
+         */
+        @Deprecated
+        public static final int NodeExists = -110;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#NOTEMPTY} instead
+         */
+        @Deprecated
+        public static final int NotEmpty = -111;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#SESSIONEXPIRED} instead
+         */
+        @Deprecated
+        public static final int SessionExpired = -112;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#INVALIDCALLBACK}
+         * instead
+         */
+        @Deprecated
+        public static final int InvalidCallback = -113;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#INVALIDACL} instead
+         */
+        @Deprecated
+        public static final int InvalidACL = -114;
+        /**
+         * @deprecated deprecated in 3.1.0, use {@link Code#AUTHFAILED} instead
+         */
+        @Deprecated
+        public static final int AuthFailed = -115;
+        
+        // This value will be used directly in {@link CODE#SESSIONMOVED}
+        // public static final int SessionMoved = -118;       
+        
+        @Deprecated
+        public static final int EphemeralOnLocalSession = -120;
+
+    }
+
+    /** Codes which represent the various KeeperException
+     * types. This enum replaces the deprecated earlier static final int
+     * constants. The old, deprecated, values are in "camel case" while the new
+     * enum values are in all CAPS.
+     */
+    @InterfaceAudience.Public
+    public static enum Code implements CodeDeprecated {
+        /** Everything is OK */
+        OK (Ok),
+
+        /** System and server-side errors.
+         * This is never thrown by the server, it shouldn't be used other than
+         * to indicate a range. Specifically error codes greater than this
+         * value, but lesser than {@link #APIERROR}, are system errors.
+         */
+        SYSTEMERROR (SystemError),
+
+        /** A runtime inconsistency was found */
+        RUNTIMEINCONSISTENCY (RuntimeInconsistency),
+        /** A data inconsistency was found */
+        DATAINCONSISTENCY (DataInconsistency),
+        /** Connection to the server has been lost */
+        CONNECTIONLOSS (ConnectionLoss),
+        /** Error while marshalling or unmarshalling data */
+        MARSHALLINGERROR (MarshallingError),
+        /** Operation is unimplemented */
+        UNIMPLEMENTED (Unimplemented),
+        /** Operation timeout */
+        OPERATIONTIMEOUT (OperationTimeout),
+        /** Invalid arguments */
+        BADARGUMENTS (BadArguments),
+        /** No quorum of new config is connected and up-to-date with the leader of last commmitted config - try 
+         *  invoking reconfiguration after new servers are connected and synced */
+        NEWCONFIGNOQUORUM (NewConfigNoQuorum),
+        /** Another reconfiguration is in progress -- concurrent reconfigs not supported (yet) */
+        RECONFIGINPROGRESS (ReconfigInProgress),
+        /** Unknown session (internal server use only) */
+        UNKNOWNSESSION (UnknownSession),
+        
+        /** API errors.
+         * This is never thrown by the server, it shouldn't be used other than
+         * to indicate a range. Specifically error codes greater than this
+         * value are API errors (while values less than this indicate a
+         * {@link #SYSTEMERROR}).
+         */
+        APIERROR (APIError),
+
+        /** Node does not exist */
+        NONODE (NoNode),
+        /** Not authenticated */
+        NOAUTH (NoAuth),
+        /** Version conflict
+            In case of reconfiguration: reconfig requested from config version X but last seen config has a different version Y */
+        BADVERSION (BadVersion),
+        /** Ephemeral nodes may not have children */
+        NOCHILDRENFOREPHEMERALS (NoChildrenForEphemerals),
+        /** The node already exists */
+        NODEEXISTS (NodeExists),
+        /** The node has children */
+        NOTEMPTY (NotEmpty),
+        /** The session has been expired by the server */
+        SESSIONEXPIRED (SessionExpired),
+        /** Invalid callback specified */
+        INVALIDCALLBACK (InvalidCallback),
+        /** Invalid ACL specified */
+        INVALIDACL (InvalidACL),
+        /** Client authentication failed */
+        AUTHFAILED (AuthFailed),
+        /** Session moved to another server, so operation is ignored */
+        SESSIONMOVED (-118),
+        /** State-changing request is passed to read-only server */
+        NOTREADONLY (-119),
+        /** Attempt to create ephemeral node on a local session */
+        EPHEMERALONLOCALSESSION (EphemeralOnLocalSession),
+        /** Attempts to remove a non-existing watcher */
+        NOWATCHER (-121),
+        /** Request not completed within max allowed time.*/
+        REQUESTTIMEOUT (-122),
+        /** Attempts to perform a reconfiguration operation when reconfiguration feature is disabled. */
+        RECONFIGDISABLED(-123);
+
+        private static final Map<Integer,Code> lookup
+            = new HashMap<Integer,Code>();
+
+        static {
+            for(Code c : EnumSet.allOf(Code.class))
+                lookup.put(c.code, c);
+        }
+
+        private final int code;
+        Code(int code) {
+            this.code = code;
+        }
+
+        /**
+         * Get the int value for a particular Code.
+         * @return error code as integer
+         */
+        public int intValue() { return code; }
+
+        /**
+         * Get the Code value for a particular integer error code
+         * @param code int error code
+         * @return Code value corresponding to specified int code, or null
+         */
+        public static Code get(int code) {
+            return lookup.get(code);
+        }
+    }
+
+    static String getCodeMessage(Code code) {
+        switch (code) {
+            case OK:
+                return "ok";
+            case SYSTEMERROR:
+                return "SystemError";
+            case RUNTIMEINCONSISTENCY:
+                return "RuntimeInconsistency";
+            case DATAINCONSISTENCY:
+                return "DataInconsistency";
+            case CONNECTIONLOSS:
+                return "ConnectionLoss";
+            case MARSHALLINGERROR:
+                return "MarshallingError";
+            case NEWCONFIGNOQUORUM:
+               return "NewConfigNoQuorum";
+            case RECONFIGINPROGRESS:
+               return "ReconfigInProgress";
+            case UNIMPLEMENTED:
+                return "Unimplemented";
+            case OPERATIONTIMEOUT:
+                return "OperationTimeout";
+            case BADARGUMENTS:
+                return "BadArguments";
+            case APIERROR:
+                return "APIError";
+            case NONODE:
+                return "NoNode";
+            case NOAUTH:
+                return "NoAuth";
+            case BADVERSION:
+                return "BadVersion";
+            case NOCHILDRENFOREPHEMERALS:
+                return "NoChildrenForEphemerals";
+            case NODEEXISTS:
+                return "NodeExists";
+            case INVALIDACL:
+                return "InvalidACL";
+            case AUTHFAILED:
+                return "AuthFailed";
+            case NOTEMPTY:
+                return "Directory not empty";
+            case SESSIONEXPIRED:
+                return "Session expired";
+            case INVALIDCALLBACK:
+                return "Invalid callback";
+            case SESSIONMOVED:
+                return "Session moved";
+            case NOTREADONLY:
+                return "Not a read-only call";
+            case EPHEMERALONLOCALSESSION:
+                return "Ephemeral node on local session";
+            case NOWATCHER:
+                return "No such watcher";
+            case RECONFIGDISABLED:
+                return "Reconfig is disabled";
+            default:
+                return "Unknown error " + code;
+        }
+    }
+
+    private Code code;
+
+    private String path;
+
+    public KeeperException(Code code) {
+        this.code = code;
+    }
+
+    KeeperException(Code code, String path) {
+        this.code = code;
+        this.path = path;
+    }
+
+    /**
+     * Read the error code for this exception
+     * @return the error code for this exception
+     * @deprecated deprecated in 3.1.0, use {@link #code()} instead
+     */
+    @Deprecated
+    public int getCode() {
+        return code.code;
+    }
+
+    /**
+     * Read the error Code for this exception
+     * @return the error Code for this exception
+     */
+    public Code code() {
+        return code;
+    }
+
+    /**
+     * Read the path for this exception
+     * @return the path associated with this error, null if none
+     */
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public String getMessage() {
+        if (path == null || path.isEmpty()) {
+            return "KeeperErrorCode = " + getCodeMessage(code);
+        }
+        return "KeeperErrorCode = " + getCodeMessage(code) + " for " + path;
+    }
+
+    void setMultiResults(List<OpResult> results) {
+        this.results = results;
+    }
+
+    /**
+     * If this exception was thrown by a multi-request then the (partial) results
+     * and error codes can be retrieved using this getter.
+     * @return A copy of the list of results from the operations in the multi-request.
+     *
+     * @since 3.4.0
+     *
+     */
+    public List<OpResult> getResults() {
+        return results != null ? new ArrayList<OpResult>(results) : null;
+    }
+
+    /**
+     *  @see Code#APIERROR
+     */
+    @InterfaceAudience.Public
+    public static class APIErrorException extends KeeperException {
+        public APIErrorException() {
+            super(Code.APIERROR);
+        }
+    }
+
+    /**
+     *  @see Code#AUTHFAILED
+     */
+    @InterfaceAudience.Public
+    public static class AuthFailedException extends KeeperException {
+        public AuthFailedException() {
+            super(Code.AUTHFAILED);
+        }
+    }
+
+    /**
+     *  @see Code#BADARGUMENTS
+     */
+    @InterfaceAudience.Public
+    public static class BadArgumentsException extends KeeperException {
+        public BadArgumentsException() {
+            super(Code.BADARGUMENTS);
+        }
+        public BadArgumentsException(String path) {
+            super(Code.BADARGUMENTS, path);
+        }
+    }
+
+    /**
+     * @see Code#BADVERSION
+     */
+    @InterfaceAudience.Public
+    public static class BadVersionException extends KeeperException {
+        public BadVersionException() {
+            super(Code.BADVERSION);
+        }
+        public BadVersionException(String path) {
+            super(Code.BADVERSION, path);
+        }
+    }
+
+    /**
+     * @see Code#CONNECTIONLOSS
+     */
+    @InterfaceAudience.Public
+    public static class ConnectionLossException extends KeeperException {
+        public ConnectionLossException() {
+            super(Code.CONNECTIONLOSS);
+        }
+    }
+
+    /**
+     * @see Code#DATAINCONSISTENCY
+     */
+    @InterfaceAudience.Public
+    public static class DataInconsistencyException extends KeeperException {
+        public DataInconsistencyException() {
+            super(Code.DATAINCONSISTENCY);
+        }
+    }
+
+    /**
+     * @see Code#INVALIDACL
+     */
+    @InterfaceAudience.Public
+    public static class InvalidACLException extends KeeperException {
+        public InvalidACLException() {
+            super(Code.INVALIDACL);
+        }
+        public InvalidACLException(String path) {
+            super(Code.INVALIDACL, path);
+        }
+    }
+
+    /**
+     * @see Code#INVALIDCALLBACK
+     */
+    @InterfaceAudience.Public
+    public static class InvalidCallbackException extends KeeperException {
+        public InvalidCallbackException() {
+            super(Code.INVALIDCALLBACK);
+        }
+    }
+
+    /**
+     * @see Code#MARSHALLINGERROR
+     */
+    @InterfaceAudience.Public
+    public static class MarshallingErrorException extends KeeperException {
+        public MarshallingErrorException() {
+            super(Code.MARSHALLINGERROR);
+        }
+    }
+
+    /**
+     * @see Code#NOAUTH
+     */
+    @InterfaceAudience.Public
+    public static class NoAuthException extends KeeperException {
+        public NoAuthException() {
+            super(Code.NOAUTH);
+        }
+    }
+
+    /**
+     * @see Code#NEWCONFIGNOQUORUM
+     */
+    @InterfaceAudience.Public
+    public static class NewConfigNoQuorum extends KeeperException {
+        public NewConfigNoQuorum() {
+            super(Code.NEWCONFIGNOQUORUM);
+        }
+    }
+    
+    /**
+     * @see Code#RECONFIGINPROGRESS
+     */
+    @InterfaceAudience.Public
+    public static class ReconfigInProgress extends KeeperException {
+        public ReconfigInProgress() {
+            super(Code.RECONFIGINPROGRESS);
+        }
+    }
+    
+    /**
+     * @see Code#NOCHILDRENFOREPHEMERALS
+     */
+    @InterfaceAudience.Public
+    public static class NoChildrenForEphemeralsException extends KeeperException {
+        public NoChildrenForEphemeralsException() {
+            super(Code.NOCHILDRENFOREPHEMERALS);
+        }
+        public NoChildrenForEphemeralsException(String path) {
+            super(Code.NOCHILDRENFOREPHEMERALS, path);
+        }
+    }
+
+    /**
+     * @see Code#NODEEXISTS
+     */
+    @InterfaceAudience.Public
+    public static class NodeExistsException extends KeeperException {
+        public NodeExistsException() {
+            super(Code.NODEEXISTS);
+        }
+        public NodeExistsException(String path) {
+            super(Code.NODEEXISTS, path);
+        }
+    }
+
+    /**
+     * @see Code#NONODE
+     */
+    @InterfaceAudience.Public
+    public static class NoNodeException extends KeeperException {
+        public NoNodeException() {
+            super(Code.NONODE);
+        }
+        public NoNodeException(String path) {
+            super(Code.NONODE, path);
+        }
+    }
+
+    /**
+     * @see Code#NOTEMPTY
+     */
+    @InterfaceAudience.Public
+    public static class NotEmptyException extends KeeperException {
+        public NotEmptyException() {
+            super(Code.NOTEMPTY);
+        }
+        public NotEmptyException(String path) {
+            super(Code.NOTEMPTY, path);
+        }
+    }
+
+    /**
+     * @see Code#OPERATIONTIMEOUT
+     */
+    @InterfaceAudience.Public
+    public static class OperationTimeoutException extends KeeperException {
+        public OperationTimeoutException() {
+            super(Code.OPERATIONTIMEOUT);
+        }
+    }
+
+    /**
+     * @see Code#RUNTIMEINCONSISTENCY
+     */
+    @InterfaceAudience.Public
+    public static class RuntimeInconsistencyException extends KeeperException {
+        public RuntimeInconsistencyException() {
+            super(Code.RUNTIMEINCONSISTENCY);
+        }
+    }
+
+    /**
+     * @see Code#SESSIONEXPIRED
+     */
+    @InterfaceAudience.Public
+    public static class SessionExpiredException extends KeeperException {
+        public SessionExpiredException() {
+            super(Code.SESSIONEXPIRED);
+        }
+    }
+
+    /**
+     * @see Code#UNKNOWNSESSION
+     */
+    @InterfaceAudience.Public
+    public static class UnknownSessionException extends KeeperException {
+        public UnknownSessionException() {
+            super(Code.UNKNOWNSESSION);
+        }
+    }
+
+    /**
+     * @see Code#SESSIONMOVED
+     */
+    @InterfaceAudience.Public
+    public static class SessionMovedException extends KeeperException {
+        public SessionMovedException() {
+            super(Code.SESSIONMOVED);
+        }
+    }
+
+    /**
+     * @see Code#NOTREADONLY
+     */
+    @InterfaceAudience.Public
+    public static class NotReadOnlyException extends KeeperException {
+        public NotReadOnlyException() {
+            super(Code.NOTREADONLY);
+        }
+    }
+
+    /**
+     * @see Code#EPHEMERALONLOCALSESSION
+     */
+    @InterfaceAudience.Public
+    public static class EphemeralOnLocalSessionException extends KeeperException {
+        public EphemeralOnLocalSessionException() {
+            super(Code.EPHEMERALONLOCALSESSION);
+        }
+    }
+
+    /**
+     * @see Code#SYSTEMERROR
+     */
+    @InterfaceAudience.Public
+    public static class SystemErrorException extends KeeperException {
+        public SystemErrorException() {
+            super(Code.SYSTEMERROR);
+        }
+    }
+
+    /**
+     * @see Code#UNIMPLEMENTED
+     */
+    @InterfaceAudience.Public
+    public static class UnimplementedException extends KeeperException {
+        public UnimplementedException() {
+            super(Code.UNIMPLEMENTED);
+        }
+    }
+
+    /**
+     * @see Code#NOWATCHER
+     */
+    @InterfaceAudience.Public
+    public static class NoWatcherException extends KeeperException {
+        public NoWatcherException() {
+            super(Code.NOWATCHER);
+        }
+
+        public NoWatcherException(String path) {
+            super(Code.NOWATCHER, path);
+        }
+    }
+
+    /**
+     * @see Code#RECONFIGDISABLED
+     */
+    @InterfaceAudience.Public
+    public static class ReconfigDisabledException extends KeeperException {
+        public ReconfigDisabledException() { super(Code.RECONFIGDISABLED); }
+        public ReconfigDisabledException(String path) {
+            super(Code.RECONFIGDISABLED, path);
+        }
+    }
+
+    /**
+     * @see Code#REQUESTTIMEOUT
+     */
+    public static class RequestTimeoutException extends KeeperException {
+        public RequestTimeoutException() {
+            super(Code.REQUESTTIMEOUT);
+        }
+    }
+}


Mime
View raw message