bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject [2/2] bookkeeper git commit: BOOKKEEPER-796: Make bookkeeper client use reconnectable zookeeper wrapper (sijie via fpj)
Date Fri, 10 Jul 2015 20:50:25 GMT
BOOKKEEPER-796: Make bookkeeper client use reconnectable zookeeper wrapper (sijie via fpj)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/a80bba27
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/a80bba27
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/a80bba27

Branch: refs/heads/master
Commit: a80bba272ecd7344ac552f3888ef5db49cbbdeb5
Parents: 8df7e0f
Author: fpj <fpj@apache.org>
Authored: Fri Jul 10 21:48:59 2015 +0100
Committer: fpj <fpj@apache.org>
Committed: Fri Jul 10 21:48:59 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/bookkeeper/bookie/Bookie.java    |  28 +-
 .../apache/bookkeeper/bookie/BookieShell.java   |  41 +-
 .../bookkeeper/bookie/FileSystemUpgrade.java    |  47 +-
 .../apache/bookkeeper/client/BookKeeper.java    | 138 +++---
 .../bookkeeper/client/BookKeeperAdmin.java      |  16 +-
 .../apache/bookkeeper/replication/Auditor.java  |   7 +-
 .../replication/AutoRecoveryMain.java           |  24 +-
 .../apache/bookkeeper/util/LocalBookKeeper.java |  44 +-
 .../org/apache/bookkeeper/util/MathUtils.java   |  13 +
 .../org/apache/bookkeeper/util/ZkUtils.java     |  27 +-
 .../ExponentialBackoffRetryPolicy.java          |   2 +-
 .../bookkeeper/zookeeper/ZooKeeperClient.java   | 493 ++++++++++++++-----
 .../zookeeper/ZooKeeperWatcherBase.java         |  72 ++-
 .../apache/bookkeeper/zookeeper/ZooWorker.java  |  50 +-
 .../bookie/BookieInitializationTest.java        |  32 +-
 .../client/BookKeeperClientZKSessionExpiry.java |  79 +++
 .../bookkeeper/meta/TestLedgerManager.java      |  17 +-
 .../replication/AuditorBookieTest.java          |  10 +-
 .../AuditorPeriodicBookieCheckTest.java         |  12 +-
 .../replication/AuditorPeriodicCheckTest.java   |  10 +-
 .../TestLedgerUnderreplicationManager.java      |  17 +-
 .../replication/TestReplicationWorker.java      |  15 +-
 .../apache/bookkeeper/test/ZooKeeperUtil.java   |   8 +-
 .../zookeeper/TestZooKeeperClient.java          |   3 +-
 25 files changed, 772 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebd39a8..2cbc970 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -83,6 +83,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-858: Fix broken links and typos in bookkeeper documents (Youngjoon Kim via sijie)
 
+      BOOKKEEPER-796: Make bookkeeper client use reconnectable zookeeper wrapper (sijie via fpj)
+
       bookkeeper-client:
 
         BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie via sijie)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 3078ff1..07b3d30 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -31,7 +31,9 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -62,6 +64,8 @@ import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.CreateMode;
@@ -70,6 +74,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
@@ -739,7 +744,7 @@ public class Bookie extends BookieCriticalThread {
             return null;
         }
         // Create the ZooKeeper client instance
-        return newZookeeper(conf.getZkServers(), conf.getZkTimeout());
+        return newZookeeper(conf);
     }
 
     /**
@@ -939,27 +944,30 @@ public class Bookie extends BookieCriticalThread {
      * are processed and quit. It is done by calling <b>shutdown</b>.
      * </p>
      *
-     * @param zkServers the quorum list of zk servers
-     * @param sessionTimeout session timeout of zk connection
+     * @param conf server configuration
      *
      * @return zk client instance
      */
-    private ZooKeeper newZookeeper(final String zkServers,
-            final int sessionTimeout) throws IOException, InterruptedException,
+    private ZooKeeper newZookeeper(ServerConfiguration conf) throws IOException, InterruptedException,
             KeeperException {
-        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout()) {
+        Set<Watcher> watchers = new HashSet<Watcher>();
+        watchers.add(new Watcher() {
             @Override
             public void process(WatchedEvent event) {
                 // Check for expired connection.
                 if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
                     LOG.error("ZK client connection to the ZK server has expired!");
                     shutdown(ExitCode.ZK_EXPIRED);
-                } else {
-                    super.process(event);
                 }
             }
-        };
-        return ZkUtils.createConnectedZookeeperClient(zkServers, w);
+        });
+        return ZooKeeperClient.newBuilder()
+                .connectString(conf.getZkServers())
+                .sessionTimeoutMs(conf.getZkTimeout())
+                .watchers(watchers)
+                .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
+                        conf.getZkTimeout(), Integer.MAX_VALUE))
+                .build();
     }
 
     public boolean isRunning() {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 48a76fb..c7a65a1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -239,7 +239,10 @@ public class BookieShell implements Tool {
             // delete cookie
             if (cmdLine.hasOption("d")) {
                 ZooKeeperClient zkc =
-                        ZooKeeperClient.createConnectedZooKeeperClient(conf.getZkServers(), conf.getZkTimeout());
+                        ZooKeeperClient.newBuilder()
+                                .connectString(conf.getZkServers())
+                                .sessionTimeoutMs(conf.getZkTimeout())
+                                .build();
                 try {
                     Versioned<Cookie> cookie = Cookie.readFromZooKeeper(zkc, conf);
                     cookie.getValue().deleteFromZooKeeper(zkc, conf, cookie.getVersion());
@@ -418,8 +421,10 @@ public class BookieShell implements Tool {
         int runCmd(CommandLine cmdLine) throws Exception {
             ZooKeeper zk = null;
             try {
-                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
-                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                zk = ZooKeeperClient.newBuilder()
+                        .connectString(bkConf.getZkServers())
+                        .sessionTimeoutMs(bkConf.getZkTimeout())
+                        .build();
                 LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
                 LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
                 Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate();
@@ -453,8 +458,10 @@ public class BookieShell implements Tool {
         public int runCmd(CommandLine cmdLine) throws Exception {
             ZooKeeper zk = null;
             try {
-                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
-                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                zk = ZooKeeperClient.newBuilder()
+                        .connectString(bkConf.getZkServers())
+                        .sessionTimeoutMs(bkConf.getZkTimeout())
+                        .build();
                 LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
                 LedgerManager m = mFactory.newLedgerManager();
                 LedgerRangeIterator iter = m.getLedgerRanges();
@@ -560,8 +567,10 @@ public class BookieShell implements Tool {
 
             ZooKeeper zk = null;
             try {
-                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
-                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                zk = ZooKeeperClient.newBuilder()
+                        .connectString(bkConf.getZkServers())
+                        .sessionTimeoutMs(bkConf.getZkTimeout())
+                        .build();
                 LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
                 LedgerManager m = mFactory.newLedgerManager();
                 ReadMetadataCallback cb = new ReadMetadataCallback(lid);
@@ -960,8 +969,10 @@ public class BookieShell implements Tool {
             }
             ZooKeeper zk = null;
             try {
-                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
-                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                zk = ZooKeeperClient.newBuilder()
+                        .connectString(bkConf.getZkServers())
+                        .sessionTimeoutMs(bkConf.getZkTimeout())
+                        .build();
                 LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
                 LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
                 if (enable) {
@@ -1018,8 +1029,10 @@ public class BookieShell implements Tool {
         int runCmd(CommandLine cmdLine) throws Exception {
             ZooKeeper zk = null;
             try {
-                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
-                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                zk = ZooKeeperClient.newBuilder()
+                        .connectString(bkConf.getZkServers())
+                        .sessionTimeoutMs(bkConf.getZkTimeout())
+                        .build();
                 BookieSocketAddress bookieId = AuditorElector.getCurrentAuditor(bkConf, zk);
                 if (bookieId == null) {
                     LOG.info("No auditor elected");
@@ -1093,9 +1106,11 @@ public class BookieShell implements Tool {
         private int updateBookieIdInCookie(final String bookieId, final boolean useHostname) throws IOException,
                 InterruptedException {
             ZooKeeper zk = null;
-            ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
             try {
-                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                zk = ZooKeeperClient.newBuilder()
+                        .connectString(bkConf.getZkServers())
+                        .sessionTimeoutMs(bkConf.getZkTimeout())
+                        .build();
                 ServerConfiguration conf = new ServerConfiguration(bkConf);
                 String newBookieId = Bookie.getBookieAddress(conf).toString();
                 // read oldcookie

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
index f6ec59c..4ec3add 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
@@ -21,27 +21,28 @@
 
 package org.apache.bookkeeper.bookie;
 
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.HardLink;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import java.net.MalformedURLException;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
@@ -49,15 +50,6 @@ import java.util.ArrayList;
 import java.util.Scanner;
 import java.util.NoSuchElementException;
 
-import java.net.MalformedURLException;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.KeeperException;
-
 import static com.google.common.base.Charsets.UTF_8;
 
 /**
@@ -139,26 +131,19 @@ public class FileSystemUpgrade {
     private static ZooKeeper newZookeeper(final ServerConfiguration conf)
             throws BookieException.UpgradeException {
         try {
-            final CountDownLatch latch = new CountDownLatch(1);
-            ZooKeeper zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(),
-                    new Watcher() {
-                        @Override
-                        public void process(WatchedEvent event) {
-                            // handle session disconnects and expires
-                            if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
-                                latch.countDown();
-                            }
-                        }
-                    });
-            if (!latch.await(conf.getZkTimeout()*2, TimeUnit.MILLISECONDS)) {
-                zk.close();
-                throw new BookieException.UpgradeException("Couldn't connect to zookeeper");
-            }
-            return zk;
+            int zkTimeout = conf.getZkTimeout();
+            return ZooKeeperClient.newBuilder()
+                    .connectString(conf.getZkServers())
+                    .sessionTimeoutMs(zkTimeout)
+                    .operationRetryPolicy(
+                            new BoundExponentialBackoffRetryPolicy(zkTimeout, zkTimeout, Integer.MAX_VALUE))
+                    .build();
         } catch (InterruptedException ie) {
             throw new BookieException.UpgradeException(ie);
         } catch (IOException ioe) {
             throw new BookieException.UpgradeException(ioe);
+        } catch (KeeperException ke) {
+            throw new BookieException.UpgradeException(ke);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 49d8e59..f74639b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Preconditions;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -42,11 +43,12 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
+import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.slf4j.Logger;
@@ -135,29 +137,8 @@ public class BookKeeper {
         }
 
         public BookKeeper build() throws IOException, InterruptedException, KeeperException {
-            boolean ownZK = false;
-            boolean ownChannelFactory = false;
-            if (zk == null) {
-                ownZK = true;
-                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout());
-                zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w);
-                w.waitForConnection();
-            }
-            if (channelFactory == null) {
-                ownChannelFactory = true;
-                ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-                channelFactory = new NioClientSocketChannelFactory(
-                        Executors.newCachedThreadPool(tfb.setNameFormat(
-                                                              "BookKeeper-NIOBoss-%d").build()),
-                        Executors.newCachedThreadPool(tfb.setNameFormat(
-                                                              "BookKeeper-NIOWorker-%d").build()));
-            }
-
-            BookKeeper bk = new BookKeeper(conf, zk, channelFactory, statsLogger);
-            bk.ownZKHandle = ownZK;
-            bk.ownChannelFactory = ownChannelFactory;
-
-            return bk;
+            Preconditions.checkNotNull(statsLogger, "No stats logger provided");
+            return new BookKeeper(conf, zk, channelFactory, statsLogger);
         }
     }
 
@@ -196,34 +177,18 @@ public class BookKeeper {
      */
     public BookKeeper(final ClientConfiguration conf)
             throws IOException, InterruptedException, KeeperException {
-        this.conf = conf;
-        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout());
-        this.zk = ZkUtils
-                .createConnectedZookeeperClient(conf.getZkServers(), w);
-        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-        this.channelFactory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(tfb.setNameFormat(
-                        "BookKeeper-NIOBoss-%d").build()),
-                Executors.newCachedThreadPool(tfb.setNameFormat(
-                        "BookKeeper-NIOWorker-%d").build()));
-        this.scheduler = Executors.newSingleThreadScheduledExecutor(tfb
-                .setNameFormat("BookKeeperClientScheduler-%d").build());
-        this.statsLogger = NullStatsLogger.INSTANCE;
-        initOpLoggers(this.statsLogger);
-        // initialize the ensemble placement
-        this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
-
-        mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads(),
-                "BookKeeperClientWorker");
-        bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
-        bookieWatcher.readBookiesBlocking();
+        this(conf, null, null, NullStatsLogger.INSTANCE);
+    }
 
-        ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
-        ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
+    private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException {
+        Preconditions.checkNotNull(zk, "No zookeeper instance provided");
+        return zk;
+    }
 
-        ownChannelFactory = true;
-        ownZKHandle = true;
+    private static ClientSocketChannelFactory validateChannelFactory(ClientSocketChannelFactory factory)
+            throws NullPointerException {
+        Preconditions.checkNotNull(factory, "No Channel Factory provided");
+        return factory;
     }
 
     /**
@@ -243,12 +208,11 @@ public class BookKeeper {
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
             throws IOException, InterruptedException, KeeperException {
 
-        this(conf, zk, new NioClientSocketChannelFactory(
+        this(conf, validateZooKeeper(zk), new NioClientSocketChannelFactory(
                 Executors.newCachedThreadPool(new ThreadFactoryBuilder()
                         .setNameFormat("BookKeeper-NIOBoss-%d").build()),
                 Executors.newCachedThreadPool(new ThreadFactoryBuilder()
                         .setNameFormat("BookKeeper-NIOWorker-%d").build())));
-        ownChannelFactory = true;
     }
 
     /**
@@ -270,43 +234,77 @@ public class BookKeeper {
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
             throws IOException, InterruptedException, KeeperException {
-        this(conf, zk, channelFactory, NullStatsLogger.INSTANCE);
+        this(conf, validateZooKeeper(zk), validateChannelFactory(channelFactory), NullStatsLogger.INSTANCE);
     }
 
     /**
      * Contructor for use with the builder. Other constructors also use it.
      */
-    private BookKeeper(ClientConfiguration conf, ZooKeeper zk,
-                       ClientSocketChannelFactory channelFactory, StatsLogger statsLogger)
+    private BookKeeper(ClientConfiguration conf,
+                       ZooKeeper zkc,
+                       ClientSocketChannelFactory channelFactory,
+                       StatsLogger statsLogger)
             throws IOException, InterruptedException, KeeperException {
-        if (zk == null || channelFactory == null) {
-            throw new NullPointerException();
+        this.conf = conf;
+
+        // initialize zookeeper client
+        if (zkc == null) {
+            this.zk = ZooKeeperClient.newBuilder()
+                    .connectString(conf.getZkServers())
+                    .sessionTimeoutMs(conf.getZkTimeout())
+                    .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
+                            conf.getZkTimeout(), 0))
+                    .statsLogger(statsLogger)
+                    .build();
+            this.ownZKHandle = true;
+        } else {
+            if (!zkc.getState().isConnected()) {
+                LOG.error("Unconnected zookeeper handle passed to bookkeeper");
+                throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+            }
+            this.zk = zkc;
+            this.ownZKHandle = false;
         }
-        if (!zk.getState().isConnected()) {
-            LOG.error("Unconnected zookeeper handle passed to bookkeeper");
-            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+
+        // initialize channel factory
+        if (null == channelFactory) {
+            ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+            this.channelFactory = new NioClientSocketChannelFactory(
+                    Executors.newCachedThreadPool(tfb.setNameFormat(
+                            "BookKeeper-NIOBoss-%d").build()),
+                    Executors.newCachedThreadPool(tfb.setNameFormat(
+                            "BookKeeper-NIOWorker-%d").build()));
+            this.ownChannelFactory = true;
+        } else {
+            this.channelFactory = channelFactory;
+            this.ownChannelFactory = false;
         }
-        this.conf = conf;
-        this.zk = zk;
-        this.channelFactory = channelFactory;
+
+        // initialize scheduler
         ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
                 "BookKeeperClientScheduler-%d");
         this.scheduler = Executors
                 .newSingleThreadScheduledExecutor(tfb.build());
+
+        // initialize stats logger
         this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
         initOpLoggers(this.statsLogger);
 
         // initialize the ensemble placement
         this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
 
-        mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads(),
+        // initialize main worker pool
+        this.mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads(),
                 "BookKeeperClientWorker");
-        bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool, statsLogger);
-        bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
-        bookieWatcher.readBookiesBlocking();
 
-        ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
-        ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
+        // initialize bookie client
+        this.bookieClient = new BookieClient(conf, this.channelFactory, this.mainWorkerPool, statsLogger);
+        this.bookieWatcher = new BookieWatcher(conf, this.scheduler, this.placementPolicy, this);
+        this.bookieWatcher.readBookiesBlocking();
+
+        // initialize ledger manager
+        this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+        this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
     }
 
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index fbca7d2..38b21e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -31,8 +31,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -126,8 +125,10 @@ public class BookKeeperAdmin {
      */
     public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException {
         // Create the ZooKeeper client instance
-        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout());
-        zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w);
+        zk = ZooKeeperClient.newBuilder()
+                .connectString(conf.getZkServers())
+                .sessionTimeoutMs(conf.getZkTimeout())
+                .build();
         ownsZK = true;
 
         // Create the bookie path
@@ -762,9 +763,10 @@ public class BookKeeperAdmin {
      */
     public static boolean format(ClientConfiguration conf,
             boolean isInteractive, boolean force) throws Exception {
-        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout());
-        ZooKeeper zkc = ZkUtils.createConnectedZookeeperClient(
-                conf.getZkServers(), w);
+        ZooKeeper zkc = ZooKeeperClient.newBuilder()
+                .connectString(conf.getZkServers())
+                .sessionTimeoutMs(conf.getZkTimeout())
+                .build();
         BookKeeper bkc = null;
         try {
             boolean ledgerRootExists = null != zkc.exists(

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 4e6e3fb..924e620 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.zookeeper.AsyncCallback;
@@ -407,8 +408,10 @@ public class Auditor implements BookiesListener {
      */
     void checkAllLedgers() throws BKAuditException, BKException,
             IOException, InterruptedException, KeeperException {
-        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout());
-        ZooKeeper newzk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w);
+        ZooKeeper newzk = ZooKeeperClient.newBuilder()
+                .connectString(conf.getZkServers())
+                .sessionTimeoutMs(conf.getZkTimeout())
+                .build();
 
         final BookKeeper client = new BookKeeper(new ClientConfiguration(conf),
                                                  newzk);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index dd08f71..4a84b3c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.replication;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.util.HashSet;
+import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -35,6 +37,7 @@ import org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
@@ -78,20 +81,27 @@ public class AutoRecoveryMain {
             throws IOException, InterruptedException, KeeperException, UnavailableException,
             CompatibilityException {
         this.conf = conf;
-        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout()) {
+        Set<Watcher> watchers = new HashSet<Watcher>();
+        // TODO: better session handling for auto recovery daemon  https://issues.apache.org/jira/browse/BOOKKEEPER-594
+        //       since {@link org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager}
+        //       use Watcher, need to ensure the logic works correctly after recreating
+        //       a new zookeeper client when session expired.
+        //       for now just shutdown it.
+        watchers.add(new Watcher() {
             @Override
             public void process(WatchedEvent event) {
                 // Check for expired connection.
                 if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
-                    LOG.error("ZK client connection to the"
-                            + " ZK server has expired!");
+                    LOG.error("ZK client connection to the ZK server has expired!");
                     shutdown(ExitCode.ZK_EXPIRED);
-                } else {
-                    super.process(event);
                 }
             }
-        };
-        zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w);
+        });
+        zk = ZooKeeperClient.newBuilder()
+                .connectString(conf.getZkServers())
+                .sessionTimeoutMs(conf.getZkTimeout())
+                .watchers(watchers)
+                .build();
         auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf,
                 zk, statsLogger.scope(AUDITOR_SCOPE));
         replicationWorker = new ReplicationWorker(zk, conf,

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
index 0937bb5..fb0064a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
@@ -25,11 +25,10 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -40,13 +39,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -109,10 +105,10 @@ public class LocalBookKeeper {
         LOG.info("Instantiate ZK Client");
         //initialize the zk client with values
         try {
-            ZKConnectionWatcher zkConnectionWatcher = new ZKConnectionWatcher();
-            zkc = new ZooKeeper(HOSTPORT, zkSessionTimeOut,
-                    zkConnectionWatcher);
-            zkConnectionWatcher.waitForConnection();
+            zkc = ZooKeeperClient.newBuilder()
+                    .connectString(HOSTPORT)
+                    .sessionTimeoutMs(zkSessionTimeOut)
+                    .build();
             zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             // No need to create an entry for each requested bookie anymore as the
@@ -228,32 +224,6 @@ public class LocalBookKeeper {
         System.err.println("Usage: LocalBookKeeper number-of-bookies");
     }
 
-    /* Watching SyncConnected event from ZooKeeper */
-    static class ZKConnectionWatcher implements Watcher {
-        private CountDownLatch clientConnectLatch = new CountDownLatch(1);
-
-        @Override
-        public void process(WatchedEvent event) {
-            if (event.getState() == KeeperState.SyncConnected) {
-                clientConnectLatch.countDown();
-            }
-        }
-
-        // Waiting for the SyncConnected event from the ZooKeeper server
-        public void waitForConnection() throws IOException {
-            try {
-                if (!clientConnectLatch.await(zkSessionTimeOut,
-                        TimeUnit.MILLISECONDS)) {
-                    throw new IOException(
-                            "Couldn't connect to zookeeper server");
-                }
-            } catch (InterruptedException e) {
-                throw new IOException(
-                        "Interrupted when connecting to zookeeper server", e);
-            }
-        }
-    }
-
     public static boolean waitForServerUp(String hp, long timeout) {
         long start = MathUtils.now();
         String split[] = hp.split(":");

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
index 4cff2bb..6aa9073 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.util;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Provides misc math functions that don't come standard
  */
@@ -75,6 +77,17 @@ public class MathUtils {
     }
 
     /**
+     * Microseconds elapsed since the time specified, the input is nanoTime
+     * the only conversion happens when computing the elapsed time
+     *
+     * @param startNanoTime the start of the interval that we are measuring
+     * @return elapsed time in milliseconds.
+     */
+    public static long elapsedMicroSec(long startNanoTime) {
+        return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime);
+    }
+
+    /**
      * Nanoseconds elapsed since the time specified, the input is nanoTime
      * the only conversion happens when computing the elapsed time
      *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
index 9608240..f237988 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
@@ -24,11 +24,12 @@ package org.apache.bookkeeper.util;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.AsyncCallback;
@@ -230,28 +231,4 @@ public class ZkUtils {
         }, null);
     }
 
-    /**
-     * Get new ZooKeeper client. Waits till the connection is complete. If
-     * connection is not successful within timeout, then throws back exception.
-     *
-     * @param servers
-     *            ZK servers connection string.
-     * @param timeout
-     *            Session timeout.
-     */
-    public static ZooKeeper createConnectedZookeeperClient(String servers,
-            ZooKeeperWatcherBase w) throws IOException, InterruptedException,
-            KeeperException {
-        if (servers == null || servers.isEmpty()) {
-            throw new IllegalArgumentException("servers cannot be empty");
-        }
-        final ZooKeeper newZk = new ZooKeeper(servers, w.getZkSessionTimeOut(),
-                w);
-        w.waitForConnection();
-        // Re-checking zookeeper connection status
-        if (!newZk.getState().isConnected()) {
-            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
-        }
-        return newZk;
-    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java
index dab7aa7..23d86f4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java
@@ -41,7 +41,7 @@ public class ExponentialBackoffRetryPolicy implements RetryPolicy {
 
     @Override
     public long nextRetryWaitTime(int retryCount, long elapsedRetryTime) {
-        return baseBackoffTime * Math.max(1, random.nextInt(1 << (retryCount + 1)));
+        return baseBackoffTime * Math.max(1, random.nextInt(Math.max(1, 1 << (retryCount + 1))));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a80bba27/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
index a479a36..80a853b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
@@ -31,6 +31,14 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooWorker.ZooCallable;
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
@@ -61,6 +69,8 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
 
     final static Logger logger = LoggerFactory.getLogger(ZooKeeperClient.class);
 
+    private static final int DEFAULT_RETRY_EXECUTOR_THREAD_COUNT = 1;
+
     // ZooKeeper client connection variables
     private final String connectString;
     private final int sessionTimeoutMs;
@@ -72,10 +82,26 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
     private final ScheduledExecutorService retryExecutor;
     private final ExecutorService connectExecutor;
 
+    // rate limiter
+    private final RateLimiter rateLimiter;
+
     // retry polices
     private final RetryPolicy connectRetryPolicy;
     private final RetryPolicy operationRetryPolicy;
 
+    // Stats Logger
+    private final OpStatsLogger createStats;
+    private final OpStatsLogger getStats;
+    private final OpStatsLogger setStats;
+    private final OpStatsLogger deleteStats;
+    private final OpStatsLogger getChildrenStats;
+    private final OpStatsLogger existsStats;
+    private final OpStatsLogger multiStats;
+    private final OpStatsLogger getACLStats;
+    private final OpStatsLogger setACLStats;
+    private final OpStatsLogger syncStats;
+    private final OpStatsLogger createClientStats;
+
     private final Callable<ZooKeeper> clientCreator = new Callable<ZooKeeper>() {
 
         @Override
@@ -86,6 +112,8 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     @Override
                     public ZooKeeper call() throws KeeperException, InterruptedException {
                         logger.info("Reconnecting zookeeper {}.", connectString);
+                        // close the previous one
+                        closeZkHandle();
                         ZooKeeper newZk;
                         try {
                             newZk = createZooKeeper();
@@ -93,13 +121,10 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                             logger.error("Failed to create zookeeper instance to " + connectString, ie);
                             throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
                         }
-                        // close the previous one
-                        closeZkHandle();
+                        waitForConnection();
                         zk.set(newZk);
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("ZooKeeper session {} is created to {}.",
-                                    Long.toHexString(newZk.getSessionId()), connectString);
-                        }
+                        logger.info("ZooKeeper session {} is created to {}.",
+                                Long.toHexString(newZk.getSessionId()), connectString);
                         return newZk;
                     }
 
@@ -108,7 +133,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                         return String.format("ZooKeeper Client Creator (%s)", connectString);
                     }
 
-                }, connectRetryPolicy);
+                }, connectRetryPolicy, rateLimiter, createClientStats);
             } catch (Exception e) {
                 logger.error("Gave up reconnecting to ZooKeeper : ", e);
                 Runtime.getRuntime().exit(-1);
@@ -118,99 +143,156 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
 
     };
 
-    public static ZooKeeper createConnectedZooKeeper(String connectString, int sessionTimeoutMs)
+    @VisibleForTesting
+    static ZooKeeperClient createConnectedZooKeeperClient(
+            String connectString, int sessionTimeoutMs, Set<Watcher> childWatchers,
+            RetryPolicy operationRetryPolicy)
                     throws KeeperException, InterruptedException, IOException {
-        ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(sessionTimeoutMs);
-        ZooKeeper zk = new ZooKeeper(connectString, sessionTimeoutMs, watcher);
-        try {
-            watcher.waitForConnection();
-        } catch (KeeperException ke) {
-            zk.close();
-            throw ke;
-        } catch (InterruptedException ie) {
-            zk.close();
-            throw ie;
-        }
-        return zk;
+        return ZooKeeperClient.newBuilder()
+                .connectString(connectString)
+                .sessionTimeoutMs(sessionTimeoutMs)
+                .watchers(childWatchers)
+                .operationRetryPolicy(operationRetryPolicy)
+                .build();
     }
 
-    public static ZooKeeperClient createConnectedZooKeeperClient(String connectString, int sessionTimeoutMs)
-                    throws KeeperException, InterruptedException, IOException {
-        ZooKeeperWatcherBase watcherManager = new ZooKeeperWatcherBase(sessionTimeoutMs);
-        ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager,
-                new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0));
-        try {
-            watcherManager.waitForConnection();
-        } catch (KeeperException ke) {
-            client.close();
-            throw ke;
-        } catch (InterruptedException ie) {
-            client.close();
-            throw ie;
+    public static class Builder {
+        String connectString = null;
+        int sessionTimeoutMs = 10000;
+        Set<Watcher> watchers = null;
+        RetryPolicy connectRetryPolicy = null;
+        RetryPolicy operationRetryPolicy = null;
+        StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+        int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
+        double requestRateLimit = 0;
+
+        private Builder() {}
+
+        public Builder connectString(String connectString) {
+            this.connectString = connectString;
+            return this;
         }
-        return client;
-    }
 
-    public static ZooKeeperClient createConnectedZooKeeperClient(
-            String connectString, int sessionTimeoutMs, RetryPolicy operationRetryPolicy)
-                    throws KeeperException, InterruptedException, IOException {
-        ZooKeeperWatcherBase watcherManager = new ZooKeeperWatcherBase(sessionTimeoutMs); 
-        ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager,
-                operationRetryPolicy);
-        try {
-            watcherManager.waitForConnection();
-        } catch (KeeperException ke) {
-            client.close();
-            throw ke;
-        } catch (InterruptedException ie) {
-            client.close();
-            throw ie;
+        public Builder sessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
         }
-        return client;
-    }
 
-    public static ZooKeeperClient createConnectedZooKeeperClient(
-            String connectString, int sessionTimeoutMs, Set<Watcher> childWatchers,
-            RetryPolicy operationRetryPolicy)
-                    throws KeeperException, InterruptedException, IOException {
-        ZooKeeperWatcherBase watcherManager =
-                new ZooKeeperWatcherBase(sessionTimeoutMs, childWatchers);
-        ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager,
-                operationRetryPolicy);
-        try {
-            watcherManager.waitForConnection();
-        } catch (KeeperException ke) {
-            client.close();
-            throw ke;
-        } catch (InterruptedException ie) {
-            client.close();
-            throw ie;
+        public Builder watchers(Set<Watcher> watchers) {
+            this.watchers = watchers;
+            return this;
+        }
+
+        public Builder connectRetryPolicy(RetryPolicy retryPolicy) {
+            this.connectRetryPolicy = retryPolicy;
+            return this;
+        }
+
+        public Builder operationRetryPolicy(RetryPolicy retryPolicy) {
+            this.operationRetryPolicy = retryPolicy;
+            return this;
+        }
+
+        public Builder statsLogger(StatsLogger statsLogger) {
+            this.statsLogger = statsLogger;
+            return this;
+        }
+
+        public Builder requestRateLimit(double requestRateLimit) {
+            this.requestRateLimit = requestRateLimit;
+            return this;
+        }
+
+        public Builder retryThreadCount(int numThreads) {
+            this.retryExecThreadCount = numThreads;
+            return this;
+        }
+
+        public ZooKeeperClient build() throws IOException, KeeperException, InterruptedException {
+            Preconditions.checkNotNull(connectString);
+            Preconditions.checkArgument(sessionTimeoutMs > 0);
+            Preconditions.checkNotNull(statsLogger);
+            Preconditions.checkArgument(retryExecThreadCount > 0);
+
+            if (null == connectRetryPolicy) {
+                connectRetryPolicy =
+                        new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE);
+            }
+            if (null == operationRetryPolicy) {
+                operationRetryPolicy =
+                        new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0);
+            }
+
+            // Create a watcher manager
+            StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
+            ZooKeeperWatcherBase watcherManager =
+                    null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
+                            new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger);
+            ZooKeeperClient client = new ZooKeeperClient(
+                    connectString,
+                    sessionTimeoutMs,
+                    watcherManager,
+                    connectRetryPolicy,
+                    operationRetryPolicy,
+                    statsLogger,
+                    retryExecThreadCount,
+                    requestRateLimit
+            );
+            // Wait for connection to be established.
+            try {
+                watcherManager.waitForConnection();
+            } catch (KeeperException ke) {
+                client.close();
+                throw ke;
+            } catch (InterruptedException ie) {
+                client.close();
+                throw ie;
+            }
+            return client;
         }
-        return client;
     }
 
-    ZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcherManager,
-            RetryPolicy operationRetryPolicy) throws IOException {
-        this(connectString, sessionTimeoutMs, watcherManager,
-                new BoundExponentialBackoffRetryPolicy(6000, 60000, Integer.MAX_VALUE),
-                operationRetryPolicy);
+    public static Builder newBuilder() {
+        return new Builder();
     }
 
-    private ZooKeeperClient(String connectString, int sessionTimeoutMs,
-            ZooKeeperWatcherBase watcherManager,
-            RetryPolicy connectRetryPolicy, RetryPolicy operationRetryPolicy) throws IOException {
+    ZooKeeperClient(String connectString,
+                    int sessionTimeoutMs,
+                    ZooKeeperWatcherBase watcherManager,
+                    RetryPolicy connectRetryPolicy,
+                    RetryPolicy operationRetryPolicy,
+                    StatsLogger statsLogger,
+                    int retryExecThreadCount,
+                    double rate) throws IOException {
         super(connectString, sessionTimeoutMs, watcherManager);
         this.connectString = connectString;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.watcherManager = watcherManager;
         this.connectRetryPolicy = connectRetryPolicy;
         this.operationRetryPolicy = operationRetryPolicy;
+        this.rateLimiter = rate > 0 ? RateLimiter.create(rate) : null;
         this.retryExecutor =
-                Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+                Executors.newScheduledThreadPool(retryExecThreadCount,
+                    new ThreadFactoryBuilder().setNameFormat("ZKC-retry-executor-%d").build());
         this.connectExecutor =
-                Executors.newSingleThreadExecutor();
+                Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder().setNameFormat("ZKC-connect-executor-%d").build());
         // added itself to the watcher
         watcherManager.addChildWatcher(this);
+
+        // Stats
+        StatsLogger scopedStatsLogger = statsLogger.scope("zk");
+        createClientStats = scopedStatsLogger.getOpStatsLogger("create_client");
+        createStats = scopedStatsLogger.getOpStatsLogger("create");
+        getStats = scopedStatsLogger.getOpStatsLogger("get_data");
+        setStats = scopedStatsLogger.getOpStatsLogger("set_data");
+        deleteStats = scopedStatsLogger.getOpStatsLogger("delete");
+        getChildrenStats = scopedStatsLogger.getOpStatsLogger("get_children");
+        existsStats = scopedStatsLogger.getOpStatsLogger("exists");
+        multiStats = scopedStatsLogger.getOpStatsLogger("multi");
+        getACLStats = scopedStatsLogger.getOpStatsLogger("get_acl");
+        setACLStats = scopedStatsLogger.getOpStatsLogger("set_acl");
+        syncStats = scopedStatsLogger.getOpStatsLogger("sync");
     }
 
     @Override
@@ -219,7 +301,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
         retryExecutor.shutdown();
         closeZkHandle();
     }
-    
+
     private void closeZkHandle() throws InterruptedException {
         ZooKeeper zkHandle = zk.get();
         if (null == zkHandle) {
@@ -246,28 +328,40 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
     }
 
     private void onExpired() {
-        if (logger.isDebugEnabled()) {
-            logger.debug("ZooKeeper session {} is expired from {}.",
-                    Long.toHexString(getSessionId()), connectString);
-        }
+        logger.info("ZooKeeper session {} is expired from {}.",
+                Long.toHexString(getSessionId()), connectString);
         try {
             connectExecutor.submit(clientCreator);
         } catch (RejectedExecutionException ree) {
             logger.error("ZooKeeper reconnect task is rejected : ", ree);
+        } catch (Exception t) {
+            logger.error("Failed to submit zookeeper reconnect task due to runtime exception : ", t);
         }
     }
 
-
-    static abstract class RetryRunnable implements Runnable {
+    static abstract class ZkRetryRunnable implements Runnable {
 
         final ZooWorker worker;
+        final RateLimiter rateLimiter;
         final Runnable that;
 
-        RetryRunnable(RetryPolicy retryPolicy) {
-            worker = new ZooWorker(retryPolicy);
+        ZkRetryRunnable(RetryPolicy retryPolicy,
+                        RateLimiter rateLimiter,
+                        OpStatsLogger statsLogger) {
+            this.worker = new ZooWorker(retryPolicy, statsLogger);
+            this.rateLimiter = rateLimiter;
             that = this;
         }
 
+        @Override
+        public void run() {
+            if (null != rateLimiter) {
+                rateLimiter.acquire();
+            }
+            zkRun();
+        }
+
+        abstract void zkRun();
     }
 
     // inherits from ZooKeeper client for all operations
@@ -319,6 +413,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
         return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<OpResult>>() {
 
             @Override
+            public String toString() {
+                return "multi";
+            }
+
+            @Override
             public List<OpResult> call() throws KeeperException, InterruptedException {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
@@ -327,11 +426,10 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.multi(ops);
             }
 
-        }, operationRetryPolicy);
+        }, operationRetryPolicy, rateLimiter, multiStats);
     }
 
     @Override
-    @Deprecated
     public Transaction transaction() {
         // since there is no reference about which client that the transaction could use
         // so just use ZooKeeper instance directly.
@@ -348,6 +446,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
         return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<ACL>>() {
 
             @Override
+            public String toString() {
+                return String.format("getACL (%s, stat = %s)", path, stat);
+            }
+
+            @Override
             public List<ACL> call() throws KeeperException, InterruptedException {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
@@ -356,12 +459,12 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.getACL(path, stat);
             }
 
-        }, operationRetryPolicy);
+        }, operationRetryPolicy, rateLimiter, getACLStats);
     }
 
     @Override
     public void getACL(final String path, final Stat stat, final ACLCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getACLStats) {
 
             final ACLCallback aclCb = new ACLCallback() {
 
@@ -378,7 +481,12 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            public String toString() {
+                return String.format("getACL (%s, stat = %s)", path, stat);
+            }
+
+            @Override
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.getACL(path, stat, aclCb, worker);
@@ -397,6 +505,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
         return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
 
             @Override
+            public String toString() {
+                return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version);
+            }
+
+            @Override
             public Stat call() throws KeeperException, InterruptedException {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
@@ -405,13 +518,13 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.setACL(path, acl, version);
             }
 
-        }, operationRetryPolicy);
+        }, operationRetryPolicy, rateLimiter, setACLStats);
     }
 
     @Override
     public void setACL(final String path, final List<ACL> acl, final int version,
             final StatCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setACLStats) {
 
             final StatCallback stCb = new StatCallback() {
 
@@ -428,7 +541,12 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            public String toString() {
+                return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version);
+            }
+
+            @Override
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.setACL(path, acl, version, stCb, worker);
@@ -443,7 +561,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
 
     @Override
     public void sync(final String path, final VoidCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, syncStats) {
 
             final VoidCallback vCb = new VoidCallback() {
 
@@ -460,7 +578,12 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            public String toString() {
+                return String.format("sync (%s)", path);
+            }
+
+            @Override
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.sync(path, vCb, worker);
@@ -508,13 +631,18 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.create(path, data, acl, createMode);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
+            }
+
+        }, operationRetryPolicy, rateLimiter, createStats);
     }
 
     @Override
     public void create(final String path, final byte[] data, final List<ACL> acl,
             final CreateMode createMode, final StringCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) {
 
             final StringCallback createCb = new StringCallback() {
 
@@ -531,7 +659,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker);
@@ -539,6 +667,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.create(path, data, acl, createMode, createCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
+            }
         };
         // execute it immediately
         proc.run();
@@ -559,12 +692,17 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return null;
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("delete (%s, version = %d)", path, version);
+            }
+
+        }, operationRetryPolicy, rateLimiter, deleteStats);
     }
 
     @Override
     public void delete(final String path, final int version, final VoidCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, deleteStats) {
 
             final VoidCallback deleteCb = new VoidCallback() {
 
@@ -581,7 +719,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.delete(path, version, deleteCb, worker);
@@ -589,6 +727,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.delete(path, version, deleteCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("delete (%s, version = %d)", path, version);
+            }
         };
         // execute it immediately
         proc.run();
@@ -607,7 +750,12 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.exists(path, watcher);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, existsStats);
     }
 
     @Override
@@ -623,12 +771,17 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.exists(path, watch);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, existsStats);
     }
 
     @Override
     public void exists(final String path, final Watcher watcher, final StatCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) {
 
             final StatCallback stCb = new StatCallback() {
 
@@ -645,7 +798,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.exists(path, watcher, stCb, worker);
@@ -653,6 +806,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.exists(path, watcher, stCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, watcher);
+            }
         };
         // execute it immediately
         proc.run();
@@ -660,7 +818,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
 
     @Override
     public void exists(final String path, final boolean watch, final StatCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) {
 
             final StatCallback stCb = new StatCallback() {
 
@@ -677,7 +835,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.exists(path, watch, stCb, worker);
@@ -685,6 +843,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.exists(path, watch, stCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, watch);
+            }
         };
         // execute it immediately
         proc.run();
@@ -704,7 +867,12 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.getData(path, watcher, stat);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getStats);
     }
 
     @Override
@@ -721,12 +889,17 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.getData(path, watch, stat);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getStats);
     }
 
     @Override
     public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) {
 
             final DataCallback dataCb = new DataCallback() {
 
@@ -743,7 +916,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.getData(path, watcher, dataCb, worker);
@@ -751,6 +924,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.getData(path, watcher, dataCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, watcher);
+            }
         };
         // execute it immediately
         proc.run();
@@ -758,7 +936,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
 
     @Override
     public void getData(final String path, final boolean watch, final DataCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) {
 
             final DataCallback dataCb = new DataCallback() {
 
@@ -775,7 +953,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.getData(path, watch, dataCb, worker);
@@ -783,6 +961,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.getData(path, watch, dataCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, watch);
+            }
         };
         // execute it immediately
         proc.run();
@@ -802,13 +985,18 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.setData(path, data, version);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("setData (%s, version = %d)", path, version);
+            }
+
+        }, operationRetryPolicy, rateLimiter, setStats);
     }
 
     @Override
     public void setData(final String path, final byte[] data, final int version,
             final StatCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) {
 
             final StatCallback stCb = new StatCallback() {
 
@@ -825,7 +1013,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.setData(path, data, version, stCb, worker);
@@ -833,6 +1021,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.setData(path, data, version, stCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("setData (%s, version = %d)", path, version);
+            }
         };
         // execute it immediately
         proc.run();
@@ -852,7 +1045,12 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.getChildren(path, watcher, stat);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
     }
 
     @Override
@@ -869,13 +1067,18 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.getChildren(path, watch, stat);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
     }
 
     @Override
     public void getChildren(final String path, final Watcher watcher,
             final Children2Callback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
 
             final Children2Callback childCb = new Children2Callback() {
 
@@ -893,7 +1096,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.getChildren(path, watcher, childCb, worker);
@@ -901,6 +1104,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.getChildren(path, watcher, childCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, watcher);
+            }
         };
         // execute it immediately
         proc.run();
@@ -909,7 +1117,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
     @Override
     public void getChildren(final String path, final boolean watch, final Children2Callback cb,
             final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
 
             final Children2Callback childCb = new Children2Callback() {
 
@@ -927,7 +1135,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.getChildren(path, watch, childCb, worker);
@@ -935,6 +1143,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.getChildren(path, watch, childCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, watch);
+            }
         };
         // execute it immediately
         proc.run();
@@ -955,7 +1168,12 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.getChildren(path, watcher);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
     }
 
     @Override
@@ -972,13 +1190,18 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                 return zkHandle.getChildren(path, watch);
             }
 
-        }, operationRetryPolicy);
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
     }
 
     @Override
     public void getChildren(final String path, final Watcher watcher,
             final ChildrenCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
 
             final ChildrenCallback childCb = new ChildrenCallback() {
 
@@ -996,7 +1219,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.getChildren(path, watcher, childCb, worker);
@@ -1004,6 +1227,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.getChildren(path, watcher, childCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, watcher);
+            }
         };
         // execute it immediately
         proc.run();
@@ -1012,7 +1240,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
     @Override
     public void getChildren(final String path, final boolean watch,
             final ChildrenCallback cb, final Object context) {
-        final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
 
             final ChildrenCallback childCb = new ChildrenCallback() {
 
@@ -1030,7 +1258,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
             };
 
             @Override
-            public void run() {
+            void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
                     ZooKeeperClient.super.getChildren(path, watch, childCb, worker);
@@ -1038,6 +1266,11 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
                     zkHandle.getChildren(path, watch, childCb, worker);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, watch);
+            }
         };
         // execute it immediately
         proc.run();


Mime
View raw message