bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject [bookkeeper] branch master updated: ISSUE #506: BP-15 New CreateLedger API
Date Fri, 13 Oct 2017 16:14:51 GMT
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new bd50cac  ISSUE #506: BP-15 New CreateLedger API
bd50cac is described below

commit bd50cacd2b759d53b06a2356c5a9369b6100d635
Author: Enrico Olivelli <eolivelli@apache.org>
AuthorDate: Fri Oct 13 18:14:29 2017 +0200

    ISSUE #506: BP-15 New CreateLedger API
    
    BP-15 New CreateLedeger API
    Introduce new org.apache.bookkeeper.client.api package
    Introduce new Mockito based test suite for BookKeeper client
    
    Author: Enrico Olivelli <eolivelli@apache.org>
    
    Reviewers: Sijie Guo <sijie.apache.org>, Ivan Kelly <ivank@apache.org>
    
    This closes #549 from eolivelli/bp15-createledger-api-2, closes #506
---
 .../common/stats/BroadCastStatsLogger.java         | 208 ++++++++++
 .../java/org/apache/bookkeeper/util/MathUtils.java |   4 +-
 .../org/apache/bookkeeper/util/package-info.java   |   9 +-
 .../bookkeeper/bookie/IndexPersistenceMgr.java     |   8 +-
 .../org/apache/bookkeeper/client/BKException.java  | 197 +---------
 .../org/apache/bookkeeper/client/BookKeeper.java   | 174 +++++----
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  38 +-
 .../apache/bookkeeper/client/BookieWatcher.java    |   4 +-
 .../bookkeeper/client/ExplicitLacFlushPolicy.java  |   6 +-
 .../apache/bookkeeper/client/LedgerCreateOp.java   | 205 +++++++++-
 .../apache/bookkeeper/client/LedgerDeleteOp.java   |  70 +++-
 .../org/apache/bookkeeper/client/LedgerEntry.java  |  11 +-
 .../client/LedgerFragmentReplicator.java           |   2 +-
 .../org/apache/bookkeeper/client/LedgerHandle.java | 242 ++++++------
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |  35 +-
 .../apache/bookkeeper/client/LedgerMetadata.java   |   4 +-
 .../org/apache/bookkeeper/client/LedgerOpenOp.java |  97 ++++-
 .../apache/bookkeeper/client/LedgerRecoveryOp.java |   2 +-
 .../org/apache/bookkeeper/client/PendingAddOp.java |   6 +-
 .../apache/bookkeeper/client/PendingReadLacOp.java |   2 +-
 .../apache/bookkeeper/client/PendingReadOp.java    |   9 +-
 .../bookkeeper/client/PendingWriteLacOp.java       |   2 +-
 .../client/ReadLastConfirmedAndEntryOp.java        |   4 +-
 .../bookkeeper/client/ReadLastConfirmedOp.java     |   4 +-
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    |   2 +-
 .../bookkeeper/client/SyncCallbackUtils.java       | 311 +++++++++++++++
 .../bookkeeper/client/SynchCallbackUtils.java      |  69 ----
 .../bookkeeper/client/TryReadLastConfirmedOp.java  |   2 +-
 .../apache/bookkeeper/client/api/BKException.java  | 236 +++++++++++
 .../apache/bookkeeper/client/api/BookKeeper.java   |  73 ++++
 .../bookkeeper/client/api/BookKeeperBuilder.java   | 104 +++++
 .../bookkeeper/client/api/CreateAdvBuilder.java    |  40 ++
 .../bookkeeper/client/api/CreateBuilder.java       |  98 +++++
 .../bookkeeper/client/api/DeleteBuilder.java       |  39 ++
 .../apache/bookkeeper/client/api/DigestType.java   |  38 ++
 .../org/apache/bookkeeper/client/api/Handle.java   |  67 ++++
 .../apache/bookkeeper/client/api/LedgerEntry.java  |  76 ++++
 .../apache/bookkeeper/client/api/OpBuilder.java    |  42 ++
 .../apache/bookkeeper/client/api/OpenBuilder.java  |  71 ++++
 .../apache/bookkeeper/client/api/ReadHandle.java   | 101 +++++
 .../bookkeeper/client/api/WriteAdvHandle.java      |  59 +++
 .../apache/bookkeeper/client/api/WriteHandle.java  |  55 +++
 .../apache/bookkeeper/client/api/package-info.java |  27 ++
 .../client/impl/BookKeeperBuilderImpl.java         |  98 +++++
 .../bookkeeper/client/impl/package-info.java       |  26 ++
 .../bookkeeper/client/BookieRecoveryTest.java      |   2 +-
 .../bookkeeper/client/BookieWriteLedgerTest.java   |   6 +-
 .../bookkeeper/client/MockBookKeeperTestCase.java  | 432 +++++++++++++++++++++
 .../client/ParallelLedgerRecoveryTest.java         |   2 +-
 .../bookkeeper/client/TestDelayEnsembleChange.java |   4 +-
 .../bookkeeper/client/api/BookKeeperApiTest.java   | 265 +++++++++++++
 .../client/api/BookKeeperBuildersTest.java         | 318 +++++++++++++++
 .../main/resources/bookkeeper/findbugsExclude.xml  |  16 +
 .../resources/bookkeeper/server-suppressions.xml   |   2 +-
 pom.xml                                            |   5 +-
 55 files changed, 3465 insertions(+), 564 deletions(-)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java
new file mode 100644
index 0000000..1b9fdea
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.CachingStatsLogger;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsData;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Stats Loggers that broadcast stats to multiple {@link StatsLogger}.
+ */
+public class BroadCastStatsLogger {
+
+    /**
+     * Create a broadcast stats logger of two stats loggers `<code>first</code>` and
+     * `<code>second</code>`. The returned stats logger doesn't allow registering any
+     * {@link Gauge}.
+     *
+     * @param first
+     *          first stats logger
+     * @param second
+     *          second stats logger
+     * @return broadcast stats logger
+     */
+    public static StatsLogger two(StatsLogger first, StatsLogger second) {
+        return new CachingStatsLogger(new Two(first, second));
+    }
+
+    static class Two implements StatsLogger {
+        protected final StatsLogger first;
+        protected final StatsLogger second;
+
+        private Two(StatsLogger first, StatsLogger second) {
+            super();
+            checkNotNull(first);
+            checkNotNull(second);
+            this.first = first;
+            this.second = second;
+        }
+
+        @Override
+        public OpStatsLogger getOpStatsLogger(final String statName) {
+            final OpStatsLogger firstLogger = first.getOpStatsLogger(statName);
+            final OpStatsLogger secondLogger = second.getOpStatsLogger(statName);
+            return new OpStatsLogger() {
+
+                @Override
+                public void registerFailedEvent(long l, TimeUnit timeUnit) {
+                    firstLogger.registerFailedEvent(l, timeUnit);
+                    secondLogger.registerFailedEvent(l, timeUnit);
+                }
+
+                @Override
+                public void registerSuccessfulEvent(long l, TimeUnit timeUnit) {
+                    firstLogger.registerSuccessfulEvent(l, timeUnit);
+                    secondLogger.registerSuccessfulEvent(l, timeUnit);
+                }
+
+                @Override
+                public void registerSuccessfulValue(long l) {
+                    firstLogger.registerSuccessfulValue(l);
+                    secondLogger.registerSuccessfulValue(l);
+                }
+
+                @Override
+                public void registerFailedValue(long l) {
+                    firstLogger.registerFailedValue(l);
+                    secondLogger.registerFailedValue(l);
+                }
+
+                @Override
+                public OpStatsData toOpStatsData() {
+                    // Eventually consistent.
+                    return firstLogger.toOpStatsData();
+                }
+
+                @Override
+                public void clear() {
+                    firstLogger.clear();
+                    secondLogger.clear();
+                }
+            };
+        }
+
+        @Override
+        public Counter getCounter(final String statName) {
+            final Counter firstCounter = first.getCounter(statName);
+            final Counter secondCounter = second.getCounter(statName);
+            return new Counter() {
+                @Override
+                public void clear() {
+                    firstCounter.clear();
+                    secondCounter.clear();
+                }
+
+                @Override
+                public void inc() {
+                    firstCounter.inc();
+                    secondCounter.inc();
+                }
+
+                @Override
+                public void dec() {
+                    firstCounter.dec();
+                    secondCounter.dec();
+                }
+
+                @Override
+                public void add(long l) {
+                    firstCounter.add(l);
+                    secondCounter.add(l);
+                }
+
+                @Override
+                public Long get() {
+                    // Eventually consistent.
+                    return firstCounter.get();
+                }
+            };
+        }
+
+        @Override
+        public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) {
+            // Different underlying stats loggers have different semantics wrt. gauge registration.
+            throw new RuntimeException("Cannot register a gauge on BroadCastStatsLogger.Two");
+        }
+
+        @Override
+        public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+            // no-op
+        }
+
+        @Override
+        public StatsLogger scope(final String scope) {
+            return new Two(first.scope(scope), second.scope(scope));
+        }
+
+        @Override
+        public void removeScope(String scope, StatsLogger statsLogger) {
+            if (!(statsLogger instanceof Two)) {
+                return;
+            }
+
+            Two another = (Two) statsLogger;
+
+            first.removeScope(scope, another.first);
+            second.removeScope(scope, another.second);
+        }
+    }
+
+    /**
+     * Create a broadcast stats logger of two stats loggers <code>master</code> and <code>slave</code>.
+     * It is similar as {@link #two(StatsLogger, StatsLogger)}, but it allows registering {@link Gauge}s.
+     * The {@link Gauge} will be registered under master.
+     *
+     * @param master
+     *          master stats logger to receive {@link Counter}, {@link OpStatsLogger} and {@link Gauge}.
+     * @param slave
+     *          slave stats logger to receive only {@link Counter} and {@link OpStatsLogger}.
+     * @return broadcast stats logger
+     */
+    public static StatsLogger masterslave(StatsLogger master, StatsLogger slave) {
+        return new CachingStatsLogger(new MasterSlave(master, slave));
+    }
+
+    static class MasterSlave extends Two {
+
+        private MasterSlave(StatsLogger master, StatsLogger slave) {
+            super(master, slave);
+        }
+
+        @Override
+        public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) {
+            first.registerGauge(statName, gauge);
+        }
+
+        @Override
+        public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+            first.unregisterGauge(statName, gauge);
+        }
+
+        @Override
+        public StatsLogger scope(String scope) {
+            return new MasterSlave(first.scope(scope), second.scope(scope));
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/util/MathUtils.java
similarity index 85%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/util/MathUtils.java
index d497b04..0124b19 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/util/MathUtils.java
@@ -18,9 +18,9 @@
 package org.apache.bookkeeper.util;
 
 /**
- * Provides misc math functions that don't come standard
+ * Provides misc math functions that don't come standard.
  *
- * @Deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.MathUtils}.
+ * <p>Deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.MathUtils}.
  */
 public class MathUtils extends org.apache.bookkeeper.common.util.MathUtils {
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/util/package-info.java
similarity index 78%
rename from bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
rename to bookkeeper-common/src/main/java/org/apache/bookkeeper/util/package-info.java
index d497b04..28180d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/util/package-info.java
@@ -15,12 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.util;
-
 /**
- * Provides misc math functions that don't come standard
- *
- * @Deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.MathUtils}.
+ * defines the utilities used across the project.
  */
-public class MathUtils extends org.apache.bookkeeper.common.util.MathUtils {
-}
+package org.apache.bookkeeper.util;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 6281eea..3120fa4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -196,8 +196,8 @@ public class IndexPersistenceMgr {
                 fileInfoLock.writeLock().lock();
                 try {
                     // We only close the fileInfo when we evict the FileInfo from both cache
-                    if (!readFileInfoCache.asMap().containsKey(ledgerId) &&
-                        !writeFileInfoCache.asMap().containsKey(ledgerId)) {
+                    if (!readFileInfoCache.asMap().containsKey(ledgerId)
+                            && !writeFileInfoCache.asMap().containsKey(ledgerId)) {
                         fileInfo.close(true);
                     }
                 } catch (IOException e) {
@@ -404,10 +404,10 @@ public class IndexPersistenceMgr {
         // because metadata will be recovered from the journal when we restart anyway.
         try {
             fileInfoLock.writeLock().lock();
-            for(Map.Entry<Long, FileInfo> entry : writeFileInfoCache.asMap().entrySet()) {
+            for (Map.Entry<Long, FileInfo> entry : writeFileInfoCache.asMap().entrySet()) {
                 entry.getValue().close(false);
             }
-            for(Map.Entry<Long, FileInfo> entry : readFileInfoCache.asMap().entrySet()) {
+            for (Map.Entry<Long, FileInfo> entry : readFileInfoCache.asMap().entrySet()) {
                 entry.getValue().close(false);
             }
             writeFileInfoCache.invalidateAll();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 406d552..ea60a72 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -20,20 +20,16 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.lang.Exception;
-
 /**
- * Class the enumerates all the possible error conditions
+ * Class the enumerates all the possible error conditions.
  *
+ * <P>This class is going to be deprecate soon, please use the new class {@link BKException}
  */
-
 @SuppressWarnings("serial")
-public abstract class BKException extends Exception {
-
-    private int code;
+public abstract class BKException extends org.apache.bookkeeper.client.api.BKException {
 
     BKException(int code) {
-        this.code = code;
+        super(code);
     }
 
     /**
@@ -111,189 +107,10 @@ public abstract class BKException extends Exception {
     }
 
     /**
-     * Codes which represent the various {@link BKException} types.
+     * Legacy interface which holds constants for BookKeeper error codes.
+     * The list has been moved to {@link BKException}
      */
-    public interface Code {
-        /** A placer holder (unused) */
-        int UNINITIALIZED = 1;
-        /** Everything is OK */
-        int OK = 0;
-        /** Read operations failed (bookie error) */
-        int ReadException = -1;
-        /** Unused */
-        int QuorumException = -2;
-        /** Unused */
-        int NoBookieAvailableException = -3;
-        /** Digest Manager is not initialized (client error) */
-        int DigestNotInitializedException = -4;
-        /** Digest doesn't match on returned entries */
-        int DigestMatchException = -5;
-        /** Not enough bookies available to form an ensemble */
-        int NotEnoughBookiesException = -6;
-        /** No such ledger exists */
-        int NoSuchLedgerExistsException = -7;
-        /** Bookies are not available */
-        int BookieHandleNotAvailableException = -8;
-        /** ZooKeeper operations failed */
-        int ZKException = -9;
-        /** Ledger recovery operations failed */
-        int LedgerRecoveryException = -10;
-        /** Executing operations on a closed ledger handle */
-        int LedgerClosedException = -11;
-        /** Write operations failed (bookie error) */
-        int WriteException = -12;
-        /** No such entry exists */
-        int NoSuchEntryException = -13;
-        /** Incorrect parameters (operations are absolutely not executed) */
-        int IncorrectParameterException = -14;
-        /** Synchronous operations are interrupted */
-        int InterruptedException = -15;
-        /** Protocol version is wrong (operations are absolutely not executed) */
-        int ProtocolVersionException = -16;
-        /** Bad version on executing metadata operations */
-        int MetadataVersionException = -17;
-        /** Meta store operations failed */
-        int MetaStoreException = -18;
-        /** Executing operations on a closed client */
-        int ClientClosedException = -19;
-        /** Ledger already exists */
-        int LedgerExistException = -20;
-        /**
-         * Add entry operation timeouts on waiting quorum responses.
-         *
-         * @since 4.5
-         */
-        int AddEntryQuorumTimeoutException = -21;
-        /**
-         * Duplicated entry id is found when {@link LedgerHandleAdv#addEntry(long, byte[])}.
-         *
-         * @since 4.5
-         */
-        int DuplicateEntryIdException = -22;
-        /**
-         * Operations timeouts.
-         *
-         * @since 4.5
-         */
-        int TimeoutException = -23;
-        int SecurityException = -24;
-
-        /**
-         * Operation is illegal.
-         */
-        int IllegalOpException = -100;
-        /**
-         * Operations failed due to ledgers are fenced.
-         */
-        int LedgerFencedException = -101;
-        /**
-         * Operations failed due to unauthorized.
-         */
-        int UnauthorizedAccessException = -102;
-        /**
-         * Replication failed due to unclosed fragments.
-         */
-        int UnclosedFragmentException = -103;
-        /**
-         * Write operations failed due to bookies are readonly
-         */
-        int WriteOnReadOnlyBookieException = -104;
-        //-105 reserved for TooManyRequestsException
-        /**
-         * Ledger id overflow happens on ledger manager.
-         *
-         * @since 4.5
-         */
-        int LedgerIdOverflowException = -106;
-
-        /**
-         * generic exception code used to propagate in replication pipeline
-         */
-        int ReplicationException = -200;
-
-        /**
-         * Unexpected condition.
-         */
-        int UnexpectedConditionException = -999;
-    }
-
-    public void setCode(int code) {
-        this.code = code;
-    }
-
-    public int getCode() {
-        return this.code;
-    }
-
-    public static String getMessage(int code) {
-        switch (code) {
-        case Code.OK:
-            return "No problem";
-        case Code.ReadException:
-            return "Error while reading ledger";
-        case Code.QuorumException:
-            return "Invalid quorum size on ensemble size";
-        case Code.NoBookieAvailableException:
-            return "Invalid quorum size on ensemble size";
-        case Code.DigestNotInitializedException:
-            return "Digest engine not initialized";
-        case Code.DigestMatchException:
-            return "Entry digest does not match";
-        case Code.NotEnoughBookiesException:
-            return "Not enough non-faulty bookies available";
-        case Code.NoSuchLedgerExistsException:
-            return "No such ledger exists";
-        case Code.BookieHandleNotAvailableException:
-            return "Bookie handle is not available";
-        case Code.ZKException:
-            return "Error while using ZooKeeper";
-        case Code.MetaStoreException:
-            return "Error while using MetaStore";
-        case Code.LedgerExistException:
-            return "Ledger existed";
-        case Code.LedgerRecoveryException:
-            return "Error while recovering ledger";
-        case Code.LedgerClosedException:
-            return "Attempt to write to a closed ledger";
-        case Code.WriteException:
-            return "Write failed on bookie";
-        case Code.NoSuchEntryException:
-            return "No such entry";
-        case Code.IncorrectParameterException:
-            return "Incorrect parameter input";
-        case Code.InterruptedException:
-            return "Interrupted while waiting for permit";
-        case Code.ProtocolVersionException:
-            return "Bookie protocol version on server is incompatible with client";
-        case Code.MetadataVersionException:
-            return "Bad ledger metadata version";
-        case Code.DuplicateEntryIdException:
-            return "Attempted to add Duplicate entryId";
-        case Code.LedgerFencedException:
-            return "Ledger has been fenced off. Some other client must have opened it to read";
-        case Code.UnauthorizedAccessException:
-            return "Attempted to access ledger using the wrong password";
-        case Code.UnclosedFragmentException:
-            return "Attempting to use an unclosed fragment; This is not safe";
-        case Code.WriteOnReadOnlyBookieException:
-            return "Attempting to write on ReadOnly bookie";
-        case Code.LedgerIdOverflowException:
-            return "Next ledgerID is too large.";
-        case Code.ReplicationException:
-            return "Errors in replication pipeline";
-        case Code.ClientClosedException:
-            return "BookKeeper client is closed";
-        case Code.IllegalOpException:
-            return "Invalid operation";
-        case Code.AddEntryQuorumTimeoutException:
-            return "Add entry quorum wait timed out";
-        case Code.TimeoutException:
-            return "Bookie operation timeout";
-        case Code.SecurityException:
-            return "Failed to establish a secure connection";
-        default:
-            return "Unexpected condition";
-        }
+    public interface Code extends org.apache.bookkeeper.client.api.BKException.Code {
     }
 
     public static class BKSecurityException extends BKException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 8a96ebe..8716c3a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -43,6 +43,14 @@ import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateAdvCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncDeleteCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncOpenCallback;
+import org.apache.bookkeeper.client.api.BookKeeperBuilder;
+import org.apache.bookkeeper.client.api.CreateBuilder;
+import org.apache.bookkeeper.client.api.DeleteBuilder;
+import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
@@ -82,7 +90,7 @@ import org.slf4j.LoggerFactory;
  * <p>The exceptions resulting from synchronous calls and error code resulting from
  * asynchronous calls can be found in the class {@link BKException}.
  */
-public class BookKeeper implements AutoCloseable {
+public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
 
     static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
 
@@ -147,6 +155,8 @@ public class BookKeeper implements AutoCloseable {
 
     /**
      * BookKeeper Client Builder to build client instances.
+     *
+     * @see BookKeeperBuilder
      */
     public static class Builder {
         final ClientConfiguration conf;
@@ -571,11 +581,47 @@ public class BookKeeper implements AutoCloseable {
     LedgerManager getUnderlyingLedgerManager() {
         return ((CleanupLedgerManager) ledgerManager).getUnderlying();
     }
-    
+
+    @VisibleForTesting
     LedgerIdGenerator getLedgerIdGenerator() {
         return ledgerIdGenerator;
     }
 
+    @VisibleForTesting
+    ReentrantReadWriteLock getCloseLock() {
+        return closeLock;
+    }
+
+    @VisibleForTesting
+    boolean isClosed() {
+        return closed;
+    }
+
+    @VisibleForTesting
+    BookieWatcher getBookieWatcher() {
+        return bookieWatcher;
+    }
+
+    @VisibleForTesting
+    OrderedSafeExecutor getMainWorkerPool() {
+        return mainWorkerPool;
+    }
+
+    @VisibleForTesting
+    ScheduledExecutorService getScheduler() {
+        return scheduler;
+    }
+
+    @VisibleForTesting
+    EnsemblePlacementPolicy getPlacementPolicy() {
+        return placementPolicy;
+    }
+
+    @VisibleForTesting
+    boolean isReorderReadSequence() {
+        return reorderReadSequence;
+    }
+
     /**
      * There are 2 digest types that can be used for verification. The CRC32 is
      * cheap to compute but does not protect against byzantine bookies (i.e., a
@@ -584,7 +630,11 @@ public class BookKeeper implements AutoCloseable {
      * report fake bytes with a mathching MAC unless it knows the password
      */
     public enum DigestType {
-        MAC, CRC32
+        MAC, CRC32;
+
+        public static DigestType fromApiDigestType(org.apache.bookkeeper.client.api.DigestType digestType) {
+            return digestType == org.apache.bookkeeper.client.api.DigestType.MAC ? MAC : CRC32;
+        }
     }
 
     ZooKeeper getZkHandle() {
@@ -788,15 +838,16 @@ public class BookKeeper implements AutoCloseable {
     public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize,
                                      DigestType digestType, byte passwd[], final Map<String, byte[]> customMetadata)
             throws InterruptedException, BKException {
-        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        SyncCreateCallback result = new SyncCreateCallback(future);
 
         /*
          * Calls asynchronous version
          */
         asyncCreateLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
-                          new SyncCreateCallback(), counter, customMetadata);
+                          result, null, customMetadata);
 
-        LedgerHandle lh = SynchCallbackUtils.waitForResult(counter);
+        LedgerHandle lh = SyncCallbackUtils.waitForResult(future);
         if (lh == null) {
             LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
             throw BKException.create(BKException.Code.UnexpectedConditionException);
@@ -816,7 +867,7 @@ public class BookKeeper implements AutoCloseable {
      * @param ackQuorumSize
      * @param digestType
      * @param passwd
-     * @param customMetadata
+     *
      * @return a handle to the newly created ledger
      * @throws InterruptedException
      * @throws BKException
@@ -847,15 +898,16 @@ public class BookKeeper implements AutoCloseable {
     public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize,
                                         DigestType digestType, byte passwd[], final Map<String, byte[]> customMetadata)
             throws InterruptedException, BKException {
-        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+        CompletableFuture<LedgerHandleAdv> future = new CompletableFuture<>();
+        SyncCreateAdvCallback result = new SyncCreateAdvCallback(future);
 
         /*
          * Calls asynchronous version
          */
         asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
-                             new SyncCreateCallback(), counter, customMetadata);
+                             result, null, customMetadata);
 
-        LedgerHandle lh = SynchCallbackUtils.waitForResult(counter);
+        LedgerHandle lh = SyncCallbackUtils.waitForResult(future);
         if (lh == null) {
             LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
             throw BKException.create(BKException.Code.UnexpectedConditionException);
@@ -938,15 +990,16 @@ public class BookKeeper implements AutoCloseable {
                                         DigestType digestType,
                                         byte passwd[],
                                         final Map<String, byte[]> customMetadata) throws InterruptedException, BKException{
-        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+        CompletableFuture<LedgerHandleAdv> future = new CompletableFuture<>();
+        SyncCreateAdvCallback result = new SyncCreateAdvCallback(future);
 
         /*
          * Calls asynchronous version
          */
         asyncCreateLedgerAdv(ledgerId, ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
-                             new SyncCreateCallback(), counter, customMetadata);
+                             result, null, customMetadata);
 
-        LedgerHandle lh = SynchCallbackUtils.waitForResult(counter);
+        LedgerHandle lh = SyncCallbackUtils.waitForResult(future);
         if (lh == null) {
             LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
             throw BKException.create(BKException.Code.UnexpectedConditionException);
@@ -1120,17 +1173,17 @@ public class BookKeeper implements AutoCloseable {
      * @throws InterruptedException
      * @throws BKException
      */
-
     public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[])
             throws BKException, InterruptedException {
-        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        SyncOpenCallback result = new SyncOpenCallback(future);
 
         /*
          * Calls async open ledger
          */
-        asyncOpenLedger(lId, digestType, passwd, new SyncOpenCallback(), counter);
+        asyncOpenLedger(lId, digestType, passwd, result, null);
 
-        return SynchCallbackUtils.waitForResult(counter);
+        return SyncCallbackUtils.waitForResult(future);
     }
 
     /**
@@ -1147,18 +1200,18 @@ public class BookKeeper implements AutoCloseable {
      * @throws InterruptedException
      * @throws BKException
      */
-
     public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte passwd[])
             throws BKException, InterruptedException {
-        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        SyncOpenCallback result = new SyncOpenCallback(future);
 
         /*
          * Calls async open ledger
          */
         asyncOpenLedgerNoRecovery(lId, digestType, passwd,
-                                  new SyncOpenCallback(), counter);
+                                  result, null);
 
-        return SynchCallbackUtils.waitForResult(counter);
+        return SyncCallbackUtils.waitForResult(future);
     }
 
     /**
@@ -1195,12 +1248,14 @@ public class BookKeeper implements AutoCloseable {
      * @throws BKException.BKNoSuchLedgerExistsException if the ledger doesn't exist
      * @throws BKException
      */
+    @SuppressWarnings("unchecked")
     public void deleteLedger(long lId) throws InterruptedException, BKException {
-        CompletableFuture<Void> counter = new CompletableFuture<>();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        SyncDeleteCallback result = new SyncDeleteCallback(future);
         // Call asynchronous version
-        asyncDeleteLedger(lId, new SyncDeleteCallback(), counter);
+        asyncDeleteLedger(lId, result, null);
 
-        SynchCallbackUtils.waitForResult(counter);
+        SyncCallbackUtils.waitForResult(future);
     }
 
     /**
@@ -1269,7 +1324,8 @@ public class BookKeeper implements AutoCloseable {
      * Shuts down client.
      *
      */
-    public void close() throws InterruptedException, BKException {
+    @Override
+    public void close() throws BKException, InterruptedException {
         closeLock.writeLock().lock();
         try {
             if (closed) {
@@ -1320,58 +1376,6 @@ public class BookKeeper implements AutoCloseable {
         }
     }
 
-    private static class SyncCreateCallback implements CreateCallback {
-        /**
-         * Create callback implementation for synchronous create call.
-         *
-         * @param rc
-         *          return code
-         * @param lh
-         *          ledger handle object
-         * @param ctx
-         *          optional control object
-         */
-        @Override
-        @SuppressWarnings("unchecked")
-        public void createComplete(int rc, LedgerHandle lh, Object ctx) {
-            SynchCallbackUtils.finish(rc, lh, (CompletableFuture<LedgerHandle>) ctx);
-        }
-    }
-
-    static class SyncOpenCallback implements OpenCallback {
-        /**
-         * Callback method for synchronous open operation
-         *
-         * @param rc
-         *          return code
-         * @param lh
-         *          ledger handle
-         * @param ctx
-         *          optional control object
-         */
-        @Override
-        @SuppressWarnings("unchecked")
-        public void openComplete(int rc, LedgerHandle lh, Object ctx) {
-            SynchCallbackUtils.finish(rc, lh, (CompletableFuture<LedgerHandle>) ctx);
-        }
-    }
-
-    private static class SyncDeleteCallback implements DeleteCallback {
-        /**
-         * Delete callback implementation for synchronous delete call.
-         *
-         * @param rc
-         *            return code
-         * @param ctx
-         *            optional control object
-         */
-        @Override
-        @SuppressWarnings("unchecked")
-        public void deleteComplete(int rc, Object ctx) {
-            SynchCallbackUtils.finish(rc, null, (CompletableFuture<Void>) ctx);
-        }
-    }
-
     private final void initOpLoggers(StatsLogger stats) {
         createOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.CREATE_OP);
         deleteOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.DELETE_OP);
@@ -1415,4 +1419,20 @@ public class BookKeeper implements AutoCloseable {
             return new NioEventLoopGroup(numThreads, threadFactory);
         }
     }
+
+    @Override
+    public CreateBuilder newCreateLedgerOp() {
+        return new LedgerCreateOp.CreateBuilderImpl(this);
+    }
+
+    @Override
+    public OpenBuilder newOpenLedgerOp() {
+        return new LedgerOpenOp.OpenBuilderImpl(this);
+    }
+
+    @Override
+    public DeleteBuilder newDeleteLedgerOp() {
+        return new LedgerDeleteOp.DeleteBuilderImpl(this);
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 8cf8833..e0bea68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
 import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.util.concurrent.AbstractFuture;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -42,12 +43,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
-
-import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
-import org.apache.bookkeeper.client.BookKeeper.SyncOpenCallback;
 import org.apache.bookkeeper.client.LedgerFragmentReplicator.SingleFragmentCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncOpenCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
@@ -80,8 +80,6 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.AbstractFuture;
-
 /**
  * Admin client for BookKeeper clusters
  */
@@ -293,11 +291,12 @@ public class BookKeeperAdmin implements AutoCloseable {
      */
     public LedgerHandle openLedger(final long lId) throws InterruptedException,
             BKException {
-        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        SyncOpenCallback result = new SyncOpenCallback(future);
 
-        new LedgerOpenOp(bkc, lId, new SyncOpenCallback(), counter).initiate();
+        new LedgerOpenOp(bkc, lId, result, null).initiate();
 
-        return SynchCallbackUtils.waitForResult(counter);
+        return SyncCallbackUtils.waitForResult(future);
     }
 
     /**
@@ -327,14 +326,16 @@ public class BookKeeperAdmin implements AutoCloseable {
      *            ledger identifier
      * @see BookKeeper#openLedgerNoRecovery
      */
+    @SuppressWarnings("unchecked")
     public LedgerHandle openLedgerNoRecovery(final long lId)
             throws InterruptedException, BKException {
-        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        SyncOpenCallback result = new SyncOpenCallback(future);
 
-        new LedgerOpenOp(bkc, lId, new SyncOpenCallback(), counter)
+        new LedgerOpenOp(bkc, lId, result, null)
                 .initiateWithoutRecovery();
 
-        return SynchCallbackUtils.waitForResult(counter);
+        return SyncCallbackUtils.waitForResult(future);
     }
 
     /**
@@ -403,12 +404,11 @@ public class BookKeeperAdmin implements AutoCloseable {
             }
             if (lastEntryId == -1 || nextEntryId <= lastEntryId) {
                 try {
-                    CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>();
+                    CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();
 
-                    handle.asyncReadEntriesInternal(nextEntryId, nextEntryId, new LedgerHandle.SyncReadCallback(),
-                            counter);
+                    handle.asyncReadEntriesInternal(nextEntryId, nextEntryId, new SyncReadCallback(result), null);
 
-                    currentEntry = SynchCallbackUtils.waitForResult(counter).nextElement();
+                    currentEntry = SyncCallbackUtils.waitForResult(result).nextElement();
 
                     return true;
                 } catch (Exception e) {
@@ -879,8 +879,8 @@ public class BookKeeperAdmin implements AutoCloseable {
             final LedgerFragment ledgerFragment,
             final BookieSocketAddress targetBookieAddress)
             throws InterruptedException, BKException {
-        CompletableFuture<Void> counter = new CompletableFuture<>();
-        ResultCallBack resultCallBack = new ResultCallBack(counter);
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        ResultCallBack resultCallBack = new ResultCallBack(result);
         SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack,
                 lh, ledgerFragment.getFirstEntryId(), ledgerFragment
                         .getAddress(), targetBookieAddress);
@@ -888,7 +888,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress);
 
         try {
-            SynchCallbackUtils.waitForResult(counter);
+            SyncCallbackUtils.waitForResult(result);
         } catch (BKException err) {
             throw BKException.create(bkc.getReturnRc(err.getCode()));
         }
@@ -905,7 +905,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         @Override
         @SuppressWarnings("unchecked")
         public void processResult(int rc, String s, Object ctx) {
-            SynchCallbackUtils.finish(rc, null, sync);
+            SyncCallbackUtils.finish(rc, null, sync);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index e15f49a..74fe089 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -187,7 +187,7 @@ class BookieWatcher implements Watcher, ChildrenCallback {
 
         // Update watcher outside ZK callback thread, to avoid deadlock in case some other
         // component is trying to do a blocking ZK operation
-        bk.mainWorkerPool.submitOrdered(path, safeRun(() -> {
+        bk.getMainWorkerPool().submitOrdered(path, safeRun(() -> {
             synchronized (BookieWatcher.this) {
                 Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
                 placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
@@ -242,7 +242,7 @@ class BookieWatcher implements Watcher, ChildrenCallback {
         final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
         readBookies(new ChildrenCallback() {
             public void processResult(int rc, String path, Object ctx, List<String> children) {
-                bk.mainWorkerPool.submitOrdered(path, safeRun(() -> {
+                bk.getMainWorkerPool().submitOrdered(path, safeRun(() -> {
                     BookieWatcher.this.processResult(rc, path, ctx, children);
                     queue.add(rc);
                 }));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index fb3ba79..5b211ba 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.bookkeeper.client.LedgerHandle.LastAddConfirmedCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.LastAddConfirmedCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +117,7 @@ interface ExplicitLacFlushPolicy {
                 }
             };
             try {
-                scheduledFuture = lh.bk.mainWorkerPool.scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
+                scheduledFuture = lh.bk.getMainWorkerPool().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
                         explicitLacIntervalInMs, explicitLacIntervalInMs, TimeUnit.MILLISECONDS);
             } catch (RejectedExecutionException re) {
                 LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}",
@@ -136,7 +136,7 @@ interface ExplicitLacFlushPolicy {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Sending Explicit LAC: {}", explicitLac);
                 }
-                lh.bk.mainWorkerPool.submit(new SafeRunnable() {
+                lh.bk.getMainWorkerPool().submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         ByteBuf toSend = lh.macManager.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 376d716..4cc62a0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -23,12 +23,21 @@ package org.apache.bookkeeper.client;
 
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateAdvCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback;
+import org.apache.bookkeeper.client.api.CreateAdvBuilder;
+import org.apache.bookkeeper.client.api.CreateBuilder;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -45,16 +54,16 @@ class LedgerCreateOp implements GenericCallback<Void> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class);
 
-    CreateCallback cb;
-    LedgerMetadata metadata;
+    final CreateCallback cb;
+    final LedgerMetadata metadata;
     LedgerHandle lh;
-    Long ledgerId = -1L;
-    Object ctx;
-    byte[] passwd;
-    BookKeeper bk;
-    DigestType digestType;
-    long startTime;
-    OpStatsLogger createOpLogger;
+    long ledgerId = -1L;
+    final Object ctx;
+    final byte[] passwd;
+    final BookKeeper bk;
+    final DigestType digestType;
+    final long startTime;
+    final OpStatsLogger createOpLogger;
     boolean adv = false;
     boolean generateLedgerId = true;
 
@@ -105,7 +114,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
 
         ArrayList<BookieSocketAddress> ensemble;
         try {
-            ensemble = bk.bookieWatcher
+            ensemble = bk.getBookieWatcher()
                     .newEnsemble(metadata.getEnsembleSize(),
                             metadata.getWriteQuorumSize(),
                             metadata.getAckQuorumSize(),
@@ -200,4 +209,180 @@ class LedgerCreateOp implements GenericCallback<Void> {
         cb.createComplete(rc, lh, ctx);
     }
 
+    static class CreateBuilderImpl implements CreateBuilder {
+
+        private final BookKeeper bk;
+        private int builderEnsembleSize = 3;
+        private int builderAckQuorumSize = 2;
+        private int builderWriteQuorumSize = 2;
+        private byte[] builderPassword;
+        private org.apache.bookkeeper.client.api.DigestType builderDigestType
+            = org.apache.bookkeeper.client.api.DigestType.CRC32;
+        private Map<String, byte[]> builderCustomMetadata = Collections.emptyMap();
+
+        CreateBuilderImpl(BookKeeper bk) {
+            this.bk = bk;
+        }
+
+        @Override
+        public CreateBuilder withEnsembleSize(int ensembleSize) {
+            this.builderEnsembleSize = ensembleSize;
+            return this;
+        }
+
+        @Override
+        public CreateBuilder withWriteQuorumSize(int writeQuorumSize) {
+            this.builderWriteQuorumSize = writeQuorumSize;
+            return this;
+        }
+
+        @Override
+        public CreateBuilder withAckQuorumSize(int ackQuorumSize) {
+            this.builderAckQuorumSize = ackQuorumSize;
+            return this;
+        }
+
+        @Override
+        public CreateBuilder withPassword(byte[] password) {
+            this.builderPassword = password;
+            return this;
+        }
+
+        @Override
+        public CreateBuilder withCustomMetadata(Map<String, byte[]> customMetadata) {
+            this.builderCustomMetadata = customMetadata;
+            return this;
+        }
+
+        @Override
+        public CreateBuilder withDigestType(org.apache.bookkeeper.client.api.DigestType digestType) {
+            this.builderDigestType = digestType;
+            return this;
+        }
+
+        @Override
+        public CreateAdvBuilder makeAdv() {
+            return new CreateAdvBuilderImpl(this);
+        }
+
+        private boolean validate() {
+            if (builderWriteQuorumSize > builderEnsembleSize) {
+                LOG.error("invalid writeQuorumSize {} > ensembleSize {}", builderWriteQuorumSize, builderEnsembleSize);
+                return false;
+            }
+
+            if (builderAckQuorumSize > builderWriteQuorumSize) {
+                LOG.error("invalid ackQuorumSize {} > writeQuorumSize {}", builderAckQuorumSize, builderWriteQuorumSize);
+                return false;
+            }
+
+            if (builderAckQuorumSize <= 0) {
+                LOG.error("invalid ackQuorumSize {} <= 0", builderAckQuorumSize);
+                return false;
+            }
+
+            if (builderPassword == null) {
+                LOG.error("invalid null password");
+                return false;
+            }
+
+            if (builderDigestType == null) {
+                LOG.error("invalid null digestType");
+                return false;
+            }
+
+            if (builderCustomMetadata == null) {
+                LOG.error("invalid null customMetadata");
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override
+        public CompletableFuture<WriteHandle> execute() {
+            CompletableFuture<WriteHandle> future = new CompletableFuture<>();
+            SyncCreateCallback callback = new SyncCreateCallback(future);
+            create(callback);
+            return future;
+        }
+
+        private void create(CreateCallback cb) {
+            if (!validate()) {
+                cb.createComplete(BKException.Code.IncorrectParameterException, null, null);
+                return;
+            }
+            LedgerCreateOp op = new LedgerCreateOp(bk, builderEnsembleSize,
+                builderWriteQuorumSize, builderAckQuorumSize, DigestType.fromApiDigestType(builderDigestType),
+                builderPassword, cb, null, builderCustomMetadata);
+            ReentrantReadWriteLock closeLock = bk.getCloseLock();
+            closeLock.readLock().lock();
+            try {
+                if (bk.isClosed()) {
+                    cb.createComplete(BKException.Code.ClientClosedException, null, null);
+                    return;
+                }
+                op.initiate();
+            } finally {
+                closeLock.readLock().unlock();
+            }
+        }
+    }
+
+    private static class CreateAdvBuilderImpl implements CreateAdvBuilder {
+
+        private Long builderLedgerId;
+        private final CreateBuilderImpl parent;
+
+         private CreateAdvBuilderImpl(CreateBuilderImpl parent) {
+            this.parent = parent;
+        }
+
+        @Override
+        public CreateAdvBuilder withLedgerId(long ledgerId) {
+            builderLedgerId = ledgerId;
+            return this;
+        }
+
+        @Override
+        public CompletableFuture<WriteAdvHandle> execute() {
+            CompletableFuture<WriteAdvHandle> future = new CompletableFuture<>();
+            SyncCreateAdvCallback callback = new SyncCreateAdvCallback(future);
+            create(callback);
+            return future;
+        }
+
+        private boolean validate() {
+            if (!parent.validate()) {
+                return false;
+            }
+            if (builderLedgerId != null && builderLedgerId < 0) {
+                LOG.error("invalid ledgerId {} < 0. Do not set en explicit value if you want automatic generation", builderLedgerId);
+                return false;
+            }
+            return true;
+        }
+
+        private void create(CreateCallback cb) {
+            if (!validate()) {
+                cb.createComplete(BKException.Code.IncorrectParameterException, null, null);
+                return;
+            }
+            LedgerCreateOp op = new LedgerCreateOp(parent.bk, parent.builderEnsembleSize,
+                    parent.builderWriteQuorumSize, parent.builderAckQuorumSize,
+                    DigestType.fromApiDigestType(parent.builderDigestType),
+                    parent.builderPassword, cb, null, parent.builderCustomMetadata);
+            ReentrantReadWriteLock closeLock = parent.bk.getCloseLock();
+            closeLock.readLock().lock();
+            try {
+                if (parent.bk.isClosed()) {
+                    cb.createComplete(BKException.Code.ClientClosedException, null, null);
+                    return;
+                }
+                op.initiateAdv(builderLedgerId == null ? -1L : builderLedgerId);
+            } finally {
+                closeLock.readLock().unlock();
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
index 50fe54a..eb7324c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
@@ -21,9 +21,13 @@
 
 package org.apache.bookkeeper.client;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncDeleteCallback;
+import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
@@ -39,12 +43,12 @@ class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class);
 
-    BookKeeper bk;
-    long ledgerId;
-    DeleteCallback cb;
-    Object ctx;
-    long startTime;
-    OpStatsLogger deleteOpLogger;
+    final BookKeeper bk;
+    final long ledgerId;
+    final DeleteCallback cb;
+    final Object ctx;
+    final long startTime;
+    final OpStatsLogger deleteOpLogger;
 
     /**
      * Constructor
@@ -59,7 +63,7 @@ class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> {
      *            optional control object
      */
     LedgerDeleteOp(BookKeeper bk, long ledgerId, DeleteCallback cb, Object ctx) {
-        super(bk.mainWorkerPool, ledgerId);
+        super(bk.getMainWorkerPool(), ledgerId);
         this.bk = bk;
         this.ledgerId = ledgerId;
         this.cb = cb;
@@ -94,4 +98,56 @@ class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> {
     public String toString() {
         return String.format("LedgerDeleteOp(%d)", ledgerId);
     }
+
+    static class DeleteBuilderImpl  implements DeleteBuilder {
+
+        private Long builderLedgerId;
+        private final BookKeeper bk;
+
+        DeleteBuilderImpl(BookKeeper bk) {
+            this.bk = bk;
+        }
+
+        @Override
+        public DeleteBuilder withLedgerId(long ledgerId) {
+            this.builderLedgerId = ledgerId;
+            return this;
+        }
+
+        @Override
+        public CompletableFuture<Void> execute() {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            SyncDeleteCallback result = new SyncDeleteCallback(future);
+            delete(builderLedgerId, result);
+            return future;
+        }
+
+        private boolean validate() {
+            if (builderLedgerId == null || builderLedgerId < 0) {
+                LOG.error("invalid ledgerId {} < 0", builderLedgerId);
+                return false;
+            }
+            return true;
+        }
+
+        private void delete(Long ledgerId, AsyncCallback.DeleteCallback cb) {
+            if (!validate()) {
+                cb.deleteComplete(BKException.Code.IncorrectParameterException, null);
+                return;
+            }
+            LedgerDeleteOp op = new LedgerDeleteOp(bk, ledgerId, cb, null);
+            ReentrantReadWriteLock closeLock = bk.getCloseLock();
+            closeLock.readLock().lock();
+            try {
+                if (bk.isClosed()) {
+                    cb.deleteComplete(BKException.Code.ClientClosedException, null);
+                    return;
+                }
+                op.initiate();
+            } finally {
+                closeLock.readLock().unlock();
+            }
+        }
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
index 24d2ff4..4e39e51 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
@@ -32,8 +32,10 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
  * the entry content.
  *
  */
-public class LedgerEntry {
-    long ledgerId;
+public class LedgerEntry
+    implements org.apache.bookkeeper.client.api.LedgerEntry {
+
+    final long ledgerId;
     long entryId;
     long length;
     ByteBuf data;
@@ -43,14 +45,17 @@ public class LedgerEntry {
         this.entryId = eId;
     }
 
+    @Override
     public long getLedgerId() {
         return ledgerId;
     }
 
+    @Override
     public long getEntryId() {
         return entryId;
     }
 
+    @Override
     public long getLength() {
         return length;
     }
@@ -63,6 +68,7 @@ public class LedgerEntry {
      * @return the content of the entry
      * @throws IllegalStateException if this method is called twice
      */
+    @Override
     public byte[] getEntry() {
         Preconditions.checkState(null != data, "entry content can be accessed only once");
         byte[] entry = new byte[data.readableBytes()];
@@ -99,6 +105,7 @@ public class LedgerEntry {
      * @throws IllegalStateException if the entry has been retrieved by {@link #getEntry()}
      * or {@link #getEntryInputStream()}.
      */
+    @Override
     public ByteBuf getEntryBuffer() {
         Preconditions.checkState(null != data, "entry content has been retrieved" +
             " by #getEntry or #getEntryInputStream");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index fe1104a..172f9ec 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -420,7 +420,7 @@ public class LedgerFragmentReplicator {
                 // such as (addEnsemble) would update it too.
                 lh
                         .rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
-                                lh.bk.mainWorkerPool, lh.getId()) {
+                                lh.bk.getMainWorkerPool(), lh.getId()) {
                             @Override
                             public void safeOperationComplete(int rc,
                                     LedgerMetadata newMeta) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 8e934ad..bb7c954 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -46,11 +46,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
+import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadResult;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -68,7 +76,7 @@ import org.slf4j.LoggerFactory;
  * Ledger handle contains ledger metadata and is used to access the read and
  * write operations to a ledger.
  */
-public class LedgerHandle implements AutoCloseable {
+public class LedgerHandle implements WriteHandle {
     final static Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
 
     final byte[] ledgerKey;
@@ -311,18 +319,24 @@ public class LedgerHandle implements AutoCloseable {
     }
 
     /**
-     * Close this ledger synchronously.
-     * @see #asyncClose
+     * {@inheritDoc }
      */
+    @Override
     public void close()
             throws InterruptedException, BKException {
-        CompletableFuture<Void> counter = new CompletableFuture<>();
-
-        asyncClose(new SyncCloseCallback(), counter);
+        SyncCallbackUtils.waitForResult(asyncClose());
+    }
 
+    /**
+     * {@inheritDoc }
+     */
+    @Override
+    public CompletableFuture<Void> asyncClose() {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        SyncCloseCallback callback = new SyncCloseCallback(result);
+        asyncClose(callback, null);
         explicitLacFlushPolicy.stopExplicitLacFlush();
-
-        SynchCallbackUtils.waitForResult(counter);
+        return result;
     }
 
     /**
@@ -370,7 +384,7 @@ public class LedgerHandle implements AutoCloseable {
      * @param rc
      */
     void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
-        bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+        bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
                 final long prevLastEntryId;
@@ -430,13 +444,13 @@ public class LedgerHandle implements AutoCloseable {
 
                 final class CloseCb extends OrderedSafeGenericCallback<Void> {
                     CloseCb() {
-                        super(bk.mainWorkerPool, ledgerId);
+                        super(bk.getMainWorkerPool(), ledgerId);
                     }
 
                     @Override
                     public void safeOperationComplete(final int rc, Void result) {
                         if (rc == BKException.Code.MetadataVersionException) {
-                            rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.mainWorkerPool,
+                            rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
                                                                                           ledgerId) {
                                 @Override
                                 public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
@@ -512,11 +526,11 @@ public class LedgerHandle implements AutoCloseable {
      */
     public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
             throws InterruptedException, BKException {
-        CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>();
+        CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();
 
-        asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(), counter);
+        asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(result), null);
 
-        return SynchCallbackUtils.waitForResult(counter);
+        return SyncCallbackUtils.waitForResult(result);
     }
 
     /**
@@ -535,11 +549,11 @@ public class LedgerHandle implements AutoCloseable {
      */
     public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long lastEntry)
             throws InterruptedException, BKException {
-        CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>();
+        CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();
 
-        asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(), counter);
+        asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(result), null);
 
-        return SynchCallbackUtils.waitForResult(counter);
+        return SyncCallbackUtils.waitForResult(result);
     }
 
     /**
@@ -612,8 +626,53 @@ public class LedgerHandle implements AutoCloseable {
         asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
     }
 
+    /**
+     * Read a sequence of entries asynchronously.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence
+     * @param lastEntry
+     *          id of last entry of sequence
+     */
+    @Override
+    public CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>> read(long firstEntry, long lastEntry) {
+        FutureReadResult result = new FutureReadResult();
+        asyncReadEntries(firstEntry, lastEntry, result, null);
+        return result;
+    }
+
+    /**
+     * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
+     * <br>This is the same of
+     * {@link #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) }
+     * but it lets the client read without checking the local value of LastAddConfirmed, so that it is possibile to
+     * read entries for which the writer has not received the acknowledge yet. <br>
+     * For entries which are within the range 0..LastAddConfirmed BookKeeper guarantees that the writer has successfully
+     * received the acknowledge.<br>
+     * For entries outside that range it is possible that the writer never received the acknowledge
+     * and so there is the risk that the reader is seeing entries before the writer and this could result in a consistency
+     * issue in some cases.<br>
+     * With this method you can even read entries before the LastAddConfirmed and entries after it with one call,
+     * the expected consistency will be as described above for each subrange of ids.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence
+     * @param lastEntry
+     *          id of last entry of sequence
+     *
+     * @see #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object)
+     * @see #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, java.lang.Object)
+     * @see #readUnconfirmedEntries(long, long)
+     */
+    @Override
+    public CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>> readUnconfirmed(long firstEntry, long lastEntry) {
+        FutureReadResult result = new FutureReadResult();
+        asyncReadUnconfirmedEntries(firstEntry, lastEntry, result, null);
+        return result;
+    }
+
     void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {
-        new PendingReadOp(this, bk.scheduler,
+        new PendingReadOp(this, bk.getScheduler(),
                           firstEntry, lastEntry, cb, ctx).initiate();
     }
 
@@ -629,6 +688,16 @@ public class LedgerHandle implements AutoCloseable {
     }
 
     /**
+     * {@inheritDoc }
+     */
+    @Override
+    public CompletableFuture<Long> append(ByteBuf data) {
+        SyncAddCallback callback = new SyncAddCallback();
+        asyncAddEntry(data, callback, null);
+        return callback;
+    }
+
+    /**
      * Add entry synchronously to an open ledger. This can be used only with
      * {@link LedgerHandleAdv} returned through ledgers created with {@link
      * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
@@ -662,12 +731,10 @@ public class LedgerHandle implements AutoCloseable {
             LOG.debug("Adding entry {}", data);
         }
 
-        CompletableFuture<Long> counter = new CompletableFuture<>();
-
         SyncAddCallback callback = new SyncAddCallback();
-        asyncAddEntry(data, offset, length, callback, counter);
+        asyncAddEntry(data, offset, length, callback, null);
 
-        return SynchCallbackUtils.waitForResult(counter);
+        return SyncCallbackUtils.waitForResult(callback);
     }
 
     /**
@@ -829,7 +896,7 @@ public class LedgerHandle implements AutoCloseable {
         if (wasClosed) {
             // make sure the callback is triggered in main worker pool
             try {
-                bk.mainWorkerPool.submit(new SafeRunnable() {
+                bk.getMainWorkerPool().submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
@@ -850,7 +917,7 @@ public class LedgerHandle implements AutoCloseable {
         }
 
         try {
-            bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+            bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     ByteBuf toSend = macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed,
@@ -968,6 +1035,25 @@ public class LedgerHandle implements AutoCloseable {
         new TryReadLastConfirmedOp(this, innercb, getLastAddConfirmed()).initiate();
     }
 
+    /**
+     * @{@inheritDoc }
+     */
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmed() {
+        FutureReadLastConfirmed result = new FutureReadLastConfirmed();
+        asyncTryReadLastConfirmed(result, null);
+        return result;
+    }
+
+    /**
+     * @{@inheritDoc }
+     */
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmed() {
+        FutureReadLastConfirmed result = new FutureReadLastConfirmed();
+        asyncReadLastConfirmed(result, null);
+        return result;
+    }
 
     /**
      * Asynchronous read next entry and the latest last add confirmed.
@@ -1040,7 +1126,7 @@ public class LedgerHandle implements AutoCloseable {
                 }
             }
         };
-        new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1, timeOutInMillis, bk.scheduler).parallelRead(parallel).initiate();
+        new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1, timeOutInMillis, bk.getScheduler()).parallelRead(parallel).initiate();
     }
 
     /**
@@ -1400,7 +1486,7 @@ public class LedgerHandle implements AutoCloseable {
         ChangeEnsembleCb(EnsembleInfo ensembleInfo,
                          int curBlockAddCompletions,
                          int ensembleChangeIdx) {
-            super(bk.mainWorkerPool, ledgerId);
+            super(bk.getMainWorkerPool(), ledgerId);
             this.ensembleInfo = ensembleInfo;
             this.curBlockAddCompletions = curBlockAddCompletions;
             this.ensembleChangeIdx = ensembleChangeIdx;
@@ -1460,7 +1546,7 @@ public class LedgerHandle implements AutoCloseable {
                                EnsembleInfo ensembleInfo,
                                int curBlockAddCompletions,
                                int ensembleChangeIdx) {
-            super(bk.mainWorkerPool, ledgerId);
+            super(bk.getMainWorkerPool(), ledgerId);
             this.rc = rc;
             this.ensembleInfo = ensembleInfo;
             this.curBlockAddCompletions = curBlockAddCompletions;
@@ -1702,11 +1788,11 @@ public class LedgerHandle implements AutoCloseable {
             return;
         }
 
-        writeLedgerConfig(new OrderedSafeGenericCallback<Void>(bk.mainWorkerPool, ledgerId) {
+        writeLedgerConfig(new OrderedSafeGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
             @Override
             public void safeOperationComplete(final int rc, Void result) {
                 if (rc == BKException.Code.MetadataVersionException) {
-                    rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.mainWorkerPool,
+                    rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
                                                                                   ledgerId) {
                         @Override
                         public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
@@ -1757,100 +1843,4 @@ public class LedgerHandle implements AutoCloseable {
         }
     }
 
-    static class LastAddConfirmedCallback implements AddLacCallback {
-        static final LastAddConfirmedCallback INSTANCE = new LastAddConfirmedCallback();
-        /**
-         * Implementation of callback interface for synchronous read method.
-         *
-         * @param rc
-         *          return code
-         * @param lh
-         *          ledger identifier
-         * @param ctx
-         *          control object
-         */
-        @Override
-        public void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
-            if (rc != BKException.Code.OK) {
-                LOG.warn("LastAddConfirmedUpdate failed: {} ", BKException.getMessage(rc));
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Callback LAC Updated for: {} ", lh.getId());
-                }
-            }
-        }
-    }
-
-    static class SyncReadCallback implements ReadCallback {
-        /**
-         * Implementation of callback interface for synchronous read method.
-         *
-         * @param rc
-         *          return code
-         * @param lh
-         *          ledger handle
-         * @param seq
-         *          sequence of entries
-         * @param ctx
-         *          control object
-         */
-        @Override
-        @SuppressWarnings("unchecked")
-        public void readComplete(int rc, LedgerHandle lh,
-                                 Enumeration<LedgerEntry> seq, Object ctx) {
-            SynchCallbackUtils.finish(rc, seq, (CompletableFuture<Enumeration<LedgerEntry>>)ctx);
-        }
-    }
-
-    static class SyncAddCallback implements AddCallback {
-
-        /**
-         * Implementation of callback interface for synchronous read method.
-         *
-         * @param rc
-         *          return code
-         * @param lh
-         *          ledger handle
-         * @param entry
-         *          entry identifier
-         * @param ctx
-         *          control object
-         */
-        @Override
-        @SuppressWarnings("unchecked")
-        public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
-            SynchCallbackUtils.finish(rc, entry, (CompletableFuture<Long>)ctx);
-        }
-    }
-
-    static class SyncReadLastConfirmedCallback implements ReadLastConfirmedCallback {
-        /**
-         * Implementation of  callback interface for synchronous read last confirmed method.
-         */
-        @Override
-        public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
-            LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
-
-            synchronized(lcCtx) {
-                lcCtx.setRC(rc);
-                lcCtx.setLastConfirmed(lastConfirmed);
-                lcCtx.notify();
-            }
-        }
-    }
-
-    static class SyncCloseCallback implements CloseCallback {
-        /**
-         * Close callback method
-         *
-         * @param rc
-         * @param lh
-         * @param ctx
-         */
-        @Override
-        @SuppressWarnings("unchecked")
-        public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-            SynchCallbackUtils.finish(rc, null, (CompletableFuture<Void>)ctx);
-        }
-    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 672edd8..39f9932 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -21,6 +21,9 @@
 
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.createFuture;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.completeExceptionally;
+
 import java.io.Serializable;
 import java.security.GeneralSecurityException;
 import java.util.Comparator;
@@ -28,22 +31,22 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
 
 /**
  * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add entries with
  * user supplied entryIds. Through this interface Ledger Length may not be accurate while the
  * ledger being written.
  */
-public class LedgerHandleAdv extends LedgerHandle {
+public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
     final static Logger LOG = LoggerFactory.getLogger(LedgerHandleAdv.class);
 
     static class PendingOpsComparator implements Comparator<PendingAddOp>, Serializable {
@@ -96,13 +99,11 @@ public class LedgerHandleAdv extends LedgerHandle {
             LOG.debug("Adding entry {}", data);
         }
 
-        CompletableFuture<Long> counter = new CompletableFuture<>();
-
         SyncAddCallback callback = new SyncAddCallback();
-        asyncAddEntry(entryId, data, offset, length, callback, counter);
+        asyncAddEntry(entryId, data, offset, length, callback, null);
 
         try {
-            return counter.get();
+            return callback.get();
         } catch (ExecutionException err) {
             throw (BKException) err.getCause();
         }
@@ -144,9 +145,14 @@ public class LedgerHandleAdv extends LedgerHandle {
      *             if offset or length is negative or offset and length sum to a
      *             value higher than the length of data.
      */
-
+    @Override
     public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
             final AddCallback cb, final Object ctx) {
+        asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
+    }
+
+    private void asyncAddEntry(final long entryId, ByteBuf data,
+            final AddCallback cb, final Object ctx) {
         PendingAddOp op = new PendingAddOp(this, cb, ctx);
         op.setEntryId(entryId);
         if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
@@ -155,7 +161,7 @@ public class LedgerHandleAdv extends LedgerHandle {
                     LedgerHandleAdv.this, entryId, ctx);
             return;
         }
-        doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
+        doAsyncAddEntry(op, data, cb, ctx);
     }
 
     /**
@@ -187,7 +193,7 @@ public class LedgerHandleAdv extends LedgerHandle {
         if (wasClosed) {
             // make sure the callback is triggered in main worker pool
             try {
-                bk.mainWorkerPool.submit(new SafeRunnable() {
+                bk.getMainWorkerPool().submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
@@ -207,7 +213,7 @@ public class LedgerHandleAdv extends LedgerHandle {
         }
 
         try {
-            bk.mainWorkerPool.submit(new SafeRunnable() {
+            bk.getMainWorkerPool().submit(new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     ByteBuf toSend = macManager.computeDigestAndPackageForSending(op.getEntryId(), lastAddConfirmed,
@@ -225,6 +231,13 @@ public class LedgerHandleAdv extends LedgerHandle {
         }
     }
 
+    @Override
+    public CompletableFuture<Long> write(long entryId, ByteBuf data) {
+        SyncAddCallback callback = new SyncAddCallback();
+        asyncAddEntry(entryId, data, callback, data);
+        return callback;
+    }
+
     /**
      * LedgerHandleAdv will not allow addEntry without providing an entryId
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 55e3987..5683105 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
 
@@ -193,7 +194,8 @@ public class LedgerMetadata {
         return hasPassword;
     }
 
-    byte[] getPassword() {
+    @VisibleForTesting
+    public byte[] getPassword() {
         return Arrays.copyOf(password, password.length);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 6cf2721..910f33a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -22,12 +22,16 @@
 package org.apache.bookkeeper.client;
 
 import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.security.GeneralSecurityException;
-
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.SyncCallbackUtils.SyncOpenCallback;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
@@ -51,7 +55,7 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
     boolean doRecovery = true;
     boolean administrativeOpen = false;
     long startTime;
-    OpStatsLogger openOpLogger;
+    final OpStatsLogger openOpLogger;
     
     final DigestType suggestedDigestType;
     final boolean enableDigestAutodetection;
@@ -73,8 +77,9 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
         this.passwd = passwd;
         this.cb = cb;
         this.ctx = ctx;
-        this.enableDigestAutodetection = bk.conf.getEnableDigestTypeAutodetection();
+        this.enableDigestAutodetection = bk.getConf().getEnableDigestTypeAutodetection();
         this.suggestedDigestType = digestType;
+        this.openOpLogger = bk.getOpenOpLogger();
     }
 
     public LedgerOpenOp(BookKeeper bk, long ledgerId, OpenCallback cb, Object ctx) {
@@ -87,6 +92,7 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
         this.administrativeOpen = true;
         this.enableDigestAutodetection = false;
         this.suggestedDigestType = bk.conf.getBookieRecoveryDigestType();
+        this.openOpLogger = bk.getOpenOpLogger();
     }
 
     /**
@@ -95,8 +101,6 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
     public void initiate() {
         startTime = MathUtils.nowInNano();
 
-        openOpLogger = bk.getOpenOpLogger();
-
         /**
          * Asynchronously read the ledger metadata node.
          */
@@ -170,7 +174,7 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
         }
 
         if (doRecovery) {
-            lh.recover(new OrderedSafeGenericCallback<Void>(bk.mainWorkerPool, ledgerId) {
+            lh.recover(new OrderedSafeGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
                 @Override
                 public void safeOperationComplete(int rc, Void result) {
                     if (rc == BKException.Code.OK) {
@@ -211,4 +215,85 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
         }
         cb.openComplete(rc, lh, ctx);
     }
+
+    static final class OpenBuilderImpl implements OpenBuilder {
+
+        private boolean builderRecovery = false;
+        private Long builderLedgerId;
+        private byte[] builderPassword;
+        private org.apache.bookkeeper.client.api.DigestType builderDigestType
+                = org.apache.bookkeeper.client.api.DigestType.CRC32;
+        private final BookKeeper bk;
+
+        OpenBuilderImpl(BookKeeper bookkeeper) {
+            this.bk = bookkeeper;
+        }
+
+        @Override
+        public OpenBuilder withLedgerId(long ledgerId) {
+            this.builderLedgerId = ledgerId;
+            return this;
+        }
+
+        @Override
+        public OpenBuilder withRecovery(boolean recovery) {
+            this.builderRecovery = recovery;
+            return this;
+        }
+
+        @Override
+        public OpenBuilder withPassword(byte[] password) {
+            this.builderPassword = password;
+            return this;
+        }
+
+        @Override
+        public OpenBuilder withDigestType(org.apache.bookkeeper.client.api.DigestType digestType) {
+            this.builderDigestType = digestType;
+            return this;
+        }
+
+        @Override
+        public CompletableFuture<ReadHandle> execute() {
+            CompletableFuture<ReadHandle> future = new CompletableFuture<>();
+            SyncOpenCallback callback = new SyncOpenCallback(future);
+            open(callback);
+            return future;
+        }
+
+        private boolean validate() {
+            if (builderLedgerId == null || builderLedgerId < 0) {
+                LOG.error("invalid ledgerId {} < 0", builderLedgerId);
+                return false;
+            }
+            return true;
+        }
+
+        private void open(OpenCallback cb) {
+
+            if (!validate()) {
+                cb.openComplete(BKException.Code.NoSuchLedgerExistsException, null, null);
+                return;
+            }
+
+            LedgerOpenOp op = new LedgerOpenOp(bk, builderLedgerId, DigestType.fromApiDigestType(builderDigestType),
+                builderPassword, cb, null);
+            ReentrantReadWriteLock closeLock = bk.getCloseLock();
+            closeLock.readLock().lock();
+            try {
+                if (bk.isClosed()) {
+                    cb.openComplete(BKException.Code.ClientClosedException, null, null);
+                    return;
+                }
+                if (builderRecovery) {
+                    op.initiate();
+                } else {
+                    op.initiateWithoutRecovery();
+                }
+            } finally {
+                closeLock.readLock().unlock();
+            }
+        }
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 1276fb0..df7c84e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -154,7 +154,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
         if (!callbackDone.get()) {
             startEntryToRead = endEntryToRead + 1;
             endEntryToRead = endEntryToRead + readBatchSize;
-            new RecoveryReadOp(lh, lh.bk.scheduler, startEntryToRead, endEntryToRead, this, null)
+            new RecoveryReadOp(lh, lh.bk.getScheduler(), startEntryToRead, endEntryToRead, this, null)
                     .parallelRead(parallelRead).initiate();
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 5ab2435..af4f35e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -102,7 +102,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
     void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : BookieProtocol.FLAG_NONE;
 
-        lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend,
+        lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend,
                 this, bookieIndex, flags);
     }
 
@@ -113,7 +113,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
 
     void timeoutQuorumWait() {
         try {
-            lh.bk.mainWorkerPool.submitOrdered(lh.ledgerId, new SafeRunnable() {
+            lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     if (completed) {
@@ -185,7 +185,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
         }
 
         if (timeoutSec > -1) {
-            this.timeout = lh.bk.bookieClient.scheduleTimeout(this, timeoutSec, TimeUnit.SECONDS);
+            this.timeout = lh.bk.getBookieClient().scheduleTimeout(this, timeoutSec, TimeUnit.SECONDS);
         }
         this.requestTimeNanos = MathUtils.nowInNano();
         this.toSend = toSend;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index 94923e0..1dffb15 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -66,7 +66,7 @@ class PendingReadLacOp implements ReadLacCallback {
 
     public void initiate() {
         for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.bookieClient.readLac(lh.metadata.currentEnsemble.get(i),
+            lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
                     lh.ledgerId, this, i);
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 06b17ec..6b62c0f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -94,8 +94,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
 
             this.ensemble = ensemble;
 
-            if (lh.bk.reorderReadSequence) {
-                this.writeSet = lh.bk.placementPolicy.reorderReadSequence(ensemble,
+            if (lh.bk.isReorderReadSequence()) {
+                this.writeSet = lh.bk.getPlacementPolicy().reorderReadSequence(ensemble,
                     lh.distributionSchedule.getWriteSet(entryId), lh.bookieFailureHistory.asMap());
             } else {
                 this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
@@ -242,7 +242,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
          */
         @Override
         public ListenableFuture<Boolean> issueSpeculativeRequest() {
-            return lh.bk.mainWorkerPool.submitOrdered(lh.getId(), new Callable<Boolean>() {
+            return lh.bk.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws Exception {
                     if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) {
@@ -480,7 +480,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         long nextEnsembleChange = startEntryId, i = startEntryId;
         this.requestTimeNanos = MathUtils.nowInNano();
         ArrayList<BookieSocketAddress> ensemble = null;
-
         do {
             if (i == nextEnsembleChange) {
                 ensemble = getLedgerMetadata().getEnsemble(i);
@@ -532,7 +531,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             lh.throttler.acquire();
         }
 
-        lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
+        lh.bk.getBookieClient().readEntry(to, lh.ledgerId, entry.entryId,
                                      this, new ReadContext(bookieIndex, to, entry));
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index ea9ee3b..755f93d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -71,7 +71,7 @@ class PendingWriteLacOp implements WriteLacCallback {
     }
 
     void sendWriteLacRequest(int bookieIndex) {
-        lh.bk.bookieClient.writeLac(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
+        lh.bk.getBookieClient().writeLac(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
                 lac, toSend, this, bookieIndex);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index ca067d4..05434d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -437,7 +437,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
      */
     @Override
     public ListenableFuture<Boolean> issueSpeculativeRequest() {
-        return lh.bk.mainWorkerPool.submitOrdered(lh.getId(), new Callable<Boolean>() {
+        return lh.bk.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
             @Override
             public Boolean call() throws Exception {
                 if (!requestComplete.get() && !request.isComplete() &&
@@ -471,7 +471,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
             LOG.debug("Calling Read LAC and Entry with {} and long polling interval {} on Bookie {} - Parallel {}",
                     new Object[] { prevEntryId, timeOutInMillis, to, parallelRead });
         }
-        lh.bk.bookieClient.readEntryWaitForLACUpdate(to,
+        lh.bk.getBookieClient().readEntryWaitForLACUpdate(to,
             lh.ledgerId,
             BookieProtocol.LAST_ADD_CONFIRMED,
             prevEntryId,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 75264c6..5733205 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -59,7 +59,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
 
     public void initiate() {
         for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.bookieClient.readEntry(lh.metadata.currentEnsemble.get(i),
+            lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
                                          lh.ledgerId,
                                          BookieProtocol.LAST_ADD_CONFIRMED,
                                          this, i);
@@ -68,7 +68,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
 
     public void initiateWithFencing() {
         for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.bookieClient.readEntryAndFenceLedger(lh.metadata.currentEnsemble.get(i),
+            lh.bk.getBookieClient().readEntryAndFenceLedger(lh.metadata.currentEnsemble.get(i),
                                                        lh.ledgerId,
                                                        lh.ledgerKey,
                                                        BookieProtocol.LAST_ADD_CONFIRMED,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 297ea32..b1d1596 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -159,7 +159,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
         }
         if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
             try {
-                bk.mainWorkerPool.submitOrdered(ledgerId, new MetadataUpdater(newMetadata));
+                bk.getMainWorkerPool().submitOrdered(ledgerId, new MetadataUpdater(newMetadata));
             } catch (RejectedExecutionException ree) {
                 LOG.error("Failed on submitting updater to update ledger metadata on ledger {} : {}",
                         ledgerId, newMetadata);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
new file mode 100644
index 0000000..e9161cf
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.collect.Iterators;
+import java.util.Enumeration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import static org.apache.bookkeeper.client.LedgerHandle.LOG;
+import org.apache.bookkeeper.client.api.Handle;
+import org.apache.bookkeeper.client.api.ReadHandle;
+
+/**
+ * Utility for callbacks
+ *
+ */
+class SyncCallbackUtils {
+
+    /**
+     * Wait for a result. This is convenience method to implement callbacks
+     *
+     * @param <T>
+     * @param future
+     * @return
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public static <T> T waitForResult(CompletableFuture<T> future) throws InterruptedException, BKException {
+        try {
+            return future.get();
+        } catch (ExecutionException err) {
+            if (err.getCause() instanceof BKException) {
+                throw (BKException) err.getCause();
+            } else {
+                BKException unexpectedConditionException
+                    = BKException.create(BKException.Code.UnexpectedConditionException);
+                unexpectedConditionException.initCause(err.getCause());
+                throw unexpectedConditionException;
+            }
+
+        }
+    }
+
+    /**
+     * Handle the Response Code and transform it to a BKException
+     *
+     * @param <T>
+     * @param rc
+     * @param result
+     * @param future
+     */
+    public static <T> void finish(int rc, T result, CompletableFuture<? super T> future) {
+        if (rc != BKException.Code.OK) {
+            future.completeExceptionally(BKException.create(rc).fillInStackTrace());
+        } else {
+            future.complete(result);
+        }
+    }
+
+    static class SyncCreateCallback implements AsyncCallback.CreateCallback {
+
+        private final CompletableFuture<? super LedgerHandle> future;
+
+        public SyncCreateCallback(CompletableFuture<? super LedgerHandle> future) {
+            this.future = future;
+        }
+
+        /**
+         * Create callback implementation for synchronous create call.
+         *
+         * @param rc return code
+         * @param lh ledger handle object
+         * @param ctx optional control object
+         */
+        @Override
+        public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+            finish(rc, lh, future);
+        }
+
+    }
+
+    static class SyncCreateAdvCallback implements AsyncCallback.CreateCallback {
+
+        private final CompletableFuture<? super LedgerHandleAdv> future;
+
+        public SyncCreateAdvCallback(CompletableFuture<? super LedgerHandleAdv> future) {
+            this.future = future;
+        }
+
+        /**
+         * Create callback implementation for synchronous create call.
+         *
+         * @param rc return code
+         * @param lh ledger handle object
+         * @param ctx optional control object
+         */
+        @Override
+        public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+            finish(rc, (LedgerHandleAdv) lh, future);
+        }
+
+    }
+
+    static class SyncOpenCallback implements AsyncCallback.OpenCallback {
+
+        private final CompletableFuture<? super LedgerHandle> future;
+
+        public SyncOpenCallback(CompletableFuture<? super LedgerHandle> future) {
+            this.future = future;
+        }
+
+        /**
+         * Callback method for synchronous open operation
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param ctx
+         *          optional control object
+         */
+        @Override
+        public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+            finish(rc, lh, future);
+        }
+    }
+
+    static class SyncDeleteCallback implements AsyncCallback.DeleteCallback {
+
+        private final CompletableFuture<Void> future;
+
+        public SyncDeleteCallback(CompletableFuture<Void> future) {
+            this.future = future;
+        }
+
+
+        /**
+         * Delete callback implementation for synchronous delete call.
+         *
+         * @param rc
+         *            return code
+         * @param ctx
+         *            optional control object
+         */
+        @Override
+        public void deleteComplete(int rc, Object ctx) {
+            finish(rc, null, future);
+        }
+    }
+
+    static class LastAddConfirmedCallback implements AsyncCallback.AddLacCallback {
+        static final LastAddConfirmedCallback INSTANCE = new LastAddConfirmedCallback();
+        /**
+         * Implementation of callback interface for synchronous read method.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger identifier
+         * @param ctx
+         *          control object
+         */
+        @Override
+        public void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
+            if (rc != BKException.Code.OK) {
+                LOG.warn("LastAddConfirmedUpdate failed: {} ", BKException.getMessage(rc));
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Callback LAC Updated for: {} ", lh.getId());
+                }
+            }
+        }
+    }
+
+    static class SyncReadCallback implements AsyncCallback.ReadCallback {
+
+        private final CompletableFuture<Enumeration<LedgerEntry>> future;
+
+        public SyncReadCallback(CompletableFuture<Enumeration<LedgerEntry>> future) {
+            this.future = future;
+        }
+
+        /**
+         * Implementation of callback interface for synchronous read method.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param seq
+         *          sequence of entries
+         * @param ctx
+         *          control object
+         */
+        @Override
+        public void readComplete(int rc, LedgerHandle lh,
+                                 Enumeration<LedgerEntry> seq, Object ctx) {
+            finish(rc, seq, future);
+        }
+    }
+
+    static class FutureReadResult
+        extends CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
+        implements AsyncCallback.ReadCallback {
+
+        /**
+         * Implementation of callback interface for read method of {@link ReadHandle}.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param seq
+         *          sequence of entries
+         * @param ctx
+         *          control object
+         */
+        @Override
+        @SuppressWarnings("unchecked")
+        public void readComplete(int rc, LedgerHandle lh,
+                                 Enumeration<LedgerEntry> seq, Object ctx) {
+            if (rc != BKException.Code.OK) {
+                this.completeExceptionally(BKException.create(rc).fillInStackTrace());
+            } else {
+                this.complete((Iterable) () -> Iterators.forEnumeration(seq));
+            }
+        }
+    }
+
+    static class SyncAddCallback extends CompletableFuture<Long> implements AsyncCallback.AddCallback {
+
+        /**
+         * Implementation of callback interface for synchronous read method.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param entry
+         *          entry identifier
+         * @param ctx
+         *          control object
+         */
+        @Override
+        public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
+            finish(rc, entry, this);
+        }
+    }
+
+    static class FutureReadLastConfirmed extends CompletableFuture<Long> implements AsyncCallback.ReadLastConfirmedCallback {
+
+        @Override
+        public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
+            finish(rc, lastConfirmed, this);
+        }
+
+    }
+
+    static class SyncReadLastConfirmedCallback implements AsyncCallback.ReadLastConfirmedCallback {
+        /**
+         * Implementation of  callback interface for synchronous read last confirmed method.
+         */
+        @Override
+        public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
+            LedgerHandle.LastConfirmedCtx lcCtx = (LedgerHandle.LastConfirmedCtx) ctx;
+
+            synchronized(lcCtx) {
+                lcCtx.setRC(rc);
+                lcCtx.setLastConfirmed(lastConfirmed);
+                lcCtx.notify();
+            }
+        }
+    }
+
+    static class SyncCloseCallback implements AsyncCallback.CloseCallback {
+
+        private final CompletableFuture<Void> future;
+
+        public SyncCloseCallback(CompletableFuture<Void> future) {
+            this.future = future;
+        }
+
+        /**
+         * Close callback method
+         *
+         * @param rc
+         * @param lh
+         * @param ctx
+         */
+        @Override
+        public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+            finish(rc, null, future);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java
deleted file mode 100644
index 656c6d9..0000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.client;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Utility for callbacks
- * 
- */
-class SynchCallbackUtils {
-
-    /**
-     * Wait for a result. This is convenience method to implement callbacks
-     *
-     * @param <T>
-     * @param future
-     * @return
-     * @throws InterruptedException
-     * @throws BKException
-     */
-    public static <T> T waitForResult(CompletableFuture<T> future) throws InterruptedException, BKException {
-        try {
-            return future.get();
-        } catch (ExecutionException err) {
-            if (err.getCause() instanceof BKException) {
-                throw (BKException) err.getCause();
-            } else {
-                BKException unexpectedConditionException
-                    = BKException.create(BKException.Code.UnexpectedConditionException);
-                unexpectedConditionException.initCause(err.getCause());
-                throw unexpectedConditionException;
-            }
-
-        }
-    }
-
-    /**
-     * Handle the Response Code and transform it to a BKException
-     * @param <T>
-     * @param rc
-     * @param result
-     * @param future 
-     */
-    public static <T> void finish(int rc, T result, CompletableFuture<T> future) {
-        if (rc != BKException.Code.OK) {
-            future.completeExceptionally(BKException.create(rc).fillInStackTrace());
-        } else {
-            future.complete(result);
-        }
-    }
-
-}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index ff07575..6eaa65e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -51,7 +51,7 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
 
     public void initiate() {
         for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.bookieClient.readEntry(lh.metadata.currentEnsemble.get(i),
+            lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
                                          lh.ledgerId,
                                          BookieProtocol.LAST_ADD_CONFIRMED,
                                          this, i);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
new file mode 100644
index 0000000..737509d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client.api;
+
+import org.apache.bookkeeper.client.LedgerHandleAdv;
+
+/**
+ * Super class for all errors which occur using BookKeeper client.
+ *
+ * @since 4.6
+ */
+public abstract class BKException extends Exception {
+    protected final int code;
+
+    /**
+     * Create a new exception.
+     *
+     * @param code the error code
+     *
+     * @see Code
+     */
+    public BKException(int code) {
+        super(getMessage(code));
+        this.code = code;
+    }
+
+    /**
+     * Get the return code for the exception.
+     *
+     * @return the error code
+     *
+     * @see Code
+     */
+    public final int getCode() {
+        return this.code;
+    }
+
+    /**
+     * Describe an error code.
+     *
+     * @param code
+     *
+     * @return the description of the error code
+     */
+    public static String getMessage(int code) {
+        switch (code) {
+        case Code.OK:
+            return "No problem";
+        case Code.ReadException:
+            return "Error while reading ledger";
+        case Code.QuorumException:
+            return "Invalid quorum size on ensemble size";
+        case Code.NoBookieAvailableException:
+            return "Invalid quorum size on ensemble size";
+        case Code.DigestNotInitializedException:
+            return "Digest engine not initialized";
+        case Code.DigestMatchException:
+            return "Entry digest does not match";
+        case Code.NotEnoughBookiesException:
+            return "Not enough non-faulty bookies available";
+        case Code.NoSuchLedgerExistsException:
+            return "No such ledger exists";
+        case Code.BookieHandleNotAvailableException:
+            return "Bookie handle is not available";
+        case Code.ZKException:
+            return "Error while using ZooKeeper";
+        case Code.MetaStoreException:
+            return "Error while using MetaStore";
+        case Code.LedgerExistException:
+            return "Ledger existed";
+        case Code.LedgerRecoveryException:
+            return "Error while recovering ledger";
+        case Code.LedgerClosedException:
+            return "Attempt to write to a closed ledger";
+        case Code.WriteException:
+            return "Write failed on bookie";
+        case Code.NoSuchEntryException:
+            return "No such entry";
+        case Code.IncorrectParameterException:
+            return "Incorrect parameter input";
+        case Code.InterruptedException:
+            return "Interrupted while waiting for permit";
+        case Code.ProtocolVersionException:
+            return "Bookie protocol version on server is incompatible with client";
+        case Code.MetadataVersionException:
+            return "Bad ledger metadata version";
+        case Code.DuplicateEntryIdException:
+            return "Attempted to add Duplicate entryId";
+        case Code.LedgerFencedException:
+            return "Ledger has been fenced off. Some other client must have opened it to read";
+        case Code.UnauthorizedAccessException:
+            return "Attempted to access ledger using the wrong password";
+        case Code.UnclosedFragmentException:
+            return "Attempting to use an unclosed fragment; This is not safe";
+        case Code.WriteOnReadOnlyBookieException:
+            return "Attempting to write on ReadOnly bookie";
+        case Code.LedgerIdOverflowException:
+            return "Next ledgerID is too large.";
+        case Code.ReplicationException:
+            return "Errors in replication pipeline";
+        case Code.ClientClosedException:
+            return "BookKeeper client is closed";
+        case Code.IllegalOpException:
+            return "Invalid operation";
+        case Code.AddEntryQuorumTimeoutException:
+            return "Add entry quorum wait timed out";
+        case Code.TimeoutException:
+            return "Bookie operation timeout";
+        case Code.SecurityException:
+            return "Failed to establish a secure connection";
+        default:
+            return "Unexpected condition";
+        }
+    }
+
+    /**
+     * Codes which represent the various exceptoin types.
+     */
+    public interface Code {
+        /** A placer holder (unused). */
+        int UNINITIALIZED = 1;
+        /** Everything is OK. */
+        int OK = 0;
+        /** Read operations failed (bookie error). */
+        int ReadException = -1;
+        /** Unused. */
+        int QuorumException = -2;
+        /** Unused. */
+        int NoBookieAvailableException = -3;
+        /** Digest Manager is not initialized (client error). */
+        int DigestNotInitializedException = -4;
+        /** Digest doesn't match on returned entries. */
+        int DigestMatchException = -5;
+        /** Not enough bookies available to form an ensemble. */
+        int NotEnoughBookiesException = -6;
+        /** No such ledger exists. */
+        int NoSuchLedgerExistsException = -7;
+        /** Bookies are not available. */
+        int BookieHandleNotAvailableException = -8;
+        /** ZooKeeper operations failed. */
+        int ZKException = -9;
+        /** Ledger recovery operations failed. */
+        int LedgerRecoveryException = -10;
+        /** Executing operations on a closed ledger handle. */
+        int LedgerClosedException = -11;
+        /** Write operations failed (bookie error). */
+        int WriteException = -12;
+        /** No such entry exists. */
+        int NoSuchEntryException = -13;
+        /** Incorrect parameters (operations are absolutely not executed). */
+        int IncorrectParameterException = -14;
+        /** Synchronous operations are interrupted. */
+        int InterruptedException = -15;
+        /** Protocol version is wrong (operations are absolutely not executed). */
+        int ProtocolVersionException = -16;
+        /** Bad version on executing metadata operations. */
+        int MetadataVersionException = -17;
+        /** Meta store operations failed. */
+        int MetaStoreException = -18;
+        /** Executing operations on a closed client. */
+        int ClientClosedException = -19;
+        /** Ledger already exists. */
+        int LedgerExistException = -20;
+        /**
+         * Add entry operation timeouts on waiting quorum responses.
+         *
+         * @since 4.5
+         */
+        int AddEntryQuorumTimeoutException = -21;
+        /**
+         * Duplicated entry id is found when {@link LedgerHandleAdv#addEntry(long, byte[])}.
+         *
+         * @since 4.5
+         */
+        int DuplicateEntryIdException = -22;
+        /**
+         * Operations timeouts.
+         *
+         * @since 4.5
+         */
+        int TimeoutException = -23;
+        int SecurityException = -24;
+
+        /**
+         * Operation is illegal.
+         */
+        int IllegalOpException = -100;
+        /**
+         * Operations failed due to ledgers are fenced.
+         */
+        int LedgerFencedException = -101;
+        /**
+         * Operations failed due to unauthorized.
+         */
+        int UnauthorizedAccessException = -102;
+        /**
+         * Replication failed due to unclosed fragments.
+         */
+        int UnclosedFragmentException = -103;
+        /**
+         * Write operations failed due to bookies are readonly.
+         */
+        int WriteOnReadOnlyBookieException = -104;
+        //-105 reserved for TooManyRequestsException
+        /**
+         * Ledger id overflow happens on ledger manager.
+         *
+         * @since 4.5
+         */
+        int LedgerIdOverflowException = -106;
+
+        /**
+         * Generic exception code used to propagate in replication pipeline.
+         */
+        int ReplicationException = -200;
+
+        /**
+         * Unexpected condition.
+         */
+        int UnexpectedConditionException = -999;
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java
new file mode 100644
index 0000000..74cb8d3
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java
@@ -0,0 +1,73 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import org.apache.bookkeeper.client.impl.BookKeeperBuilderImpl;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+
+/**
+ * This is the entry point for BookKeeper client side API.
+ *
+ * @since 4.6
+ */
+public interface BookKeeper extends AutoCloseable {
+
+    /**
+     * Create a new builder which can be used to boot a new BookKeeper client.
+     *
+     * @param clientConfiguration the configuration for the client
+     * @return a builder
+     */
+    static BookKeeperBuilder newBuilder(final ClientConfiguration clientConfiguration) {
+        return new BookKeeperBuilderImpl(clientConfiguration);
+    }
+
+    /**
+     * Start the creation of a new ledger.
+     *
+     * @return a builder for the new ledger
+     */
+    CreateBuilder newCreateLedgerOp();
+
+    /**
+     * Open an existing ledger.
+     *
+     * @return a builder useful to create a readable handler for an existing ledger
+     */
+    OpenBuilder newOpenLedgerOp();
+
+    /**
+     * Delete an existing ledger.
+     *
+     * @return a builder useful to delete an existing ledger
+     */
+    DeleteBuilder newDeleteLedgerOp();
+
+    /**
+     * Close the client and release every resource.
+     *
+     * @throws BKException
+     * @throws InterruptedException
+     */
+    @Override
+    void close() throws BKException, InterruptedException;
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
new file mode 100644
index 0000000..fe5bfdb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import java.io.IOException;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * BookKeeper Client Builder to build client instances.
+ *
+ * @since 4.6
+ */
+public interface BookKeeperBuilder {
+
+    /**
+     * Configure the bookkeeper client with a provided Netty EventLoopGroup.
+     *
+     * @param eventLoopGroup an external {@link EventLoopGroup} to use by the bookkeeper client.
+     *
+     * @return client builder.
+     */
+    BookKeeperBuilder eventLoopGroup(EventLoopGroup eventLoopGroup);
+
+    /**
+     * Configure the bookkeeper client with a provided ZooKeeper client.
+     *
+     * @param zk an external {@link ZooKeeper} client to use by the bookkeeper client.
+     *
+     * @return client builder.
+     */
+    BookKeeperBuilder zk(ZooKeeper zk);
+
+    /**
+     * Configure the bookkeeper client with a provided {@link StatsLogger}.
+     *
+     * @param statsLogger an {@link StatsLogger} to use by the bookkeeper client to collect stats generated by the
+     * client.
+     *
+     * @return client builder.
+     */
+    BookKeeperBuilder statsLogger(StatsLogger statsLogger);
+
+    /**
+     * Configure the bookkeeper client to use the provided dns resolver {@link DNSToSwitchMapping}.
+     *
+     * @param dnsResolver dns resolver for placement policy to use for resolving network locations.
+     *
+     * @return client builder
+     */
+    BookKeeperBuilder dnsResolver(DNSToSwitchMapping dnsResolver);
+
+    /**
+     * Configure the bookkeeper client to use a provided Netty HashedWheelTimer.
+     *
+     * @param requestTimer request timer for client to manage timer related tasks.
+     *
+     * @return client builder
+     */
+    BookKeeperBuilder requestTimer(HashedWheelTimer requestTimer);
+
+    /**
+     * Configure the bookkeeper client to use a provided {@link FeatureProvider}.
+     *
+     * @param featureProvider the feature provider
+     *
+     * @return client builder
+     */
+    BookKeeperBuilder featureProvider(FeatureProvider featureProvider);
+
+    /**
+     * Start and initialize a new BookKeeper client.
+     *
+     * @return the client
+     *
+     * @throws BKException
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    BookKeeper build() throws BKException, InterruptedException, IOException;
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateAdvBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateAdvBuilder.java
new file mode 100644
index 0000000..d4f9ffa
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateAdvBuilder.java
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+/**
+ * Builder-style interface to create new ledgers.
+ *
+ * @since 4.6
+ * @see BookKeeper#newCreateLedgerOp()
+ */
+public interface CreateAdvBuilder extends OpBuilder<WriteAdvHandle> {
+
+    /**
+     * Set a fixed ledgerId for the newly created ledger. If no explicit ledgerId is passed a new ledger id will be
+     * assigned automatically.
+     *
+     * @param ledgerId
+     *
+     * @return the builder itself
+     */
+    CreateAdvBuilder withLedgerId(long ledgerId);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
new file mode 100644
index 0000000..6d7d1ee
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/CreateBuilder.java
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.Map;
+
+/**
+ * Builder-style interface to create new ledgers.
+ *
+ * @since 4.6
+ * @see BookKeeper#newCreateLedgerOp()
+ */
+public interface CreateBuilder extends OpBuilder<WriteHandle> {
+
+    /**
+     * Set the number of bookies which will receive data for this ledger. It defaults to 3.
+     *
+     * @param ensembleSize the number of bookies
+     *
+     * @return the builder itself
+     */
+    CreateBuilder withEnsembleSize(int ensembleSize);
+
+    /**
+     * Set the number of bookies which receive every single entry. In case of ensembleSize > writeQuorumSize data will
+     * be striped across a number of ensembleSize bookies. It defaults to 2.
+     *
+     * @param writeQuorumSize the replication factor for each entry
+     *
+     * @return the builder itself
+     */
+    CreateBuilder withWriteQuorumSize(int writeQuorumSize);
+
+    /**
+     * Set the number of acknowledgements to wait before considering a write to be completed with success. This value
+     * can be less or equals to writeQuorumSize. It defaults to 2.
+     *
+     * @param ackQuorumSize the number of acknowledgements to wait for
+     *
+     * @return the builder itself
+     */
+    CreateBuilder withAckQuorumSize(int ackQuorumSize);
+
+    /**
+     * Set a password for the ledger. It defaults to empty password
+     *
+     * @param password the password
+     *
+     * @return the builder itself
+     */
+    CreateBuilder withPassword(byte[] password);
+
+    /**
+     * Set a map a custom data to be attached to the ledger. The application is responsible for the semantics of these
+     * data.
+     *
+     * @param customMetadata the ledger metadata
+     *
+     * @return the builder itself
+     */
+    CreateBuilder withCustomMetadata(Map<String, byte[]> customMetadata);
+
+    /**
+     * Set the Digest type used to guard data against corruption. It defaults to {@link DigestType#CRC32}
+     *
+     * @param digestType the type of digest
+     *
+     * @return the builder itself
+     */
+    CreateBuilder withDigestType(DigestType digestType);
+
+    /**
+     * Switch the ledger into 'Advanced' mode. A ledger used in Advanced mode will explicitly generate the sequence of
+     * entry identifiers. Advanced ledgers can be created with a client side defined ledgerId
+     *
+     * @return a new {@link CreateAdvBuilder} builder
+     */
+    CreateAdvBuilder makeAdv();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/DeleteBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/DeleteBuilder.java
new file mode 100644
index 0000000..0652d5c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/DeleteBuilder.java
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+/**
+ * Builder-style interface to delete exiting ledgers.
+ *
+ * @since 4.6
+ */
+public interface DeleteBuilder extends OpBuilder<Void> {
+
+    /**
+     * Set the id of the ledger to be deleted.
+     *
+     * @param ledgerId
+     *
+     * @return the builder itself
+     */
+    DeleteBuilder withLedgerId(long ledgerId);
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/DigestType.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/DigestType.java
new file mode 100644
index 0000000..f6c59ff
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/DigestType.java
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+/**
+ * Digest type.
+ *
+ * @since 4.6
+ */
+public enum DigestType {
+
+    /**
+     * Entries are verified by applied CRC32 algorithm.
+     */
+    CRC32,
+    /**
+     * Entries are verified by applied MAC algorithm.
+     */
+    MAC
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
new file mode 100644
index 0000000..86cb135
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.SneakyThrows;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+
+/**
+ * Handle to manage an open ledger.
+ *
+ * @since 4.6
+ */
+public interface Handle extends AutoCloseable {
+
+    /**
+     * Get the id of the current ledger.
+     *
+     * @return the id of the ledger
+     */
+    long getId();
+
+    /**
+     * Close this ledger synchronously.
+     *
+     * @throws org.apache.bookkeeper.client.api.BKException
+     * @throws java.lang.InterruptedException
+     * @see #asyncClose
+     */
+    @Override
+    @SneakyThrows(Exception.class)
+    default void close() throws BKException, InterruptedException {
+        FutureUtils.result(asyncClose());
+    }
+
+    /**
+     * Asynchronous close, any adds in flight will return errors.
+     *
+     * <p>Closing a ledger will ensure that all clients agree on what the last
+     * entry of the ledger is. This ensures that, once the ledger has been closed,
+     * all reads from the ledger will return the same set of entries.
+     *
+     * @return an handle to access the result of the operation
+     *
+     * @see FutureUtils#result(java.util.concurrent.CompletableFuture) to have a simple method to access the result
+     */
+    CompletableFuture<Void> asyncClose();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
new file mode 100644
index 0000000..e56f6d6
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+
+/**
+ * An entry.
+ *
+ * @since 4.6
+ */
+public interface LedgerEntry {
+
+    /**
+     * The id of the ledger which contains the entry.
+     *
+     * @return the id of the ledger
+     */
+    long getLedgerId();
+
+    /**
+     * The id of the entry.
+     *
+     * @return the id of the entry
+     */
+    long getEntryId();
+
+    /**
+     * The length of the entry, that is the size of the content expressed in bytes.
+     *
+     * @return the size of the content
+     */
+    long getLength();
+
+    /**
+     * Returns the content of the entry. This method can be called only once. While using v2 wire protocol this method
+     * will automatically release the internal ByteBuf.
+     *
+     * @return the content of the entry
+     * @throws IllegalStateException if this method is called twice
+     */
+    byte[] getEntry();
+
+    /**
+     * Return the internal buffer that contains the entry payload.
+     *
+     * <p>Note: Using v2 wire protocol it is responsibility of the caller
+     * to ensure to release the buffer after usage.
+     *
+     * @return a ByteBuf which contains the data
+     *
+     * @see ClientConfiguration#setNettyUsePooledBuffers(boolean)
+     * @throws IllegalStateException if the entry has been retrieved by {@link #getEntry()}
+     */
+    ByteBuf getEntryBuffer();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/OpBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/OpBuilder.java
new file mode 100644
index 0000000..72c569c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/OpBuilder.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+
+/**
+ * Base interface for builders.
+ *
+ * @since 4.6
+ */
+public interface OpBuilder<T> {
+
+    /**
+     * Start the operation and return an handle to the result.
+     *
+     * @return an handle to access the result of the operation
+     *
+     * @see FutureUtils#result(java.util.concurrent.CompletableFuture) to have a simple method to access the result
+     */
+    CompletableFuture<T> execute();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/OpenBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/OpenBuilder.java
new file mode 100644
index 0000000..63de713
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/OpenBuilder.java
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+
+/**
+ * Builder-style interface to open exiting ledgers.
+ *
+ * @since 4.6
+ */
+public interface OpenBuilder extends OpBuilder<ReadHandle> {
+
+    /**
+     * Set the id of the ledger to be opened.
+     *
+     * @param ledgerId
+     *
+     * @return the builder itself
+     */
+    OpenBuilder withLedgerId(long ledgerId);
+
+    /**
+     * Define to open the ledger in recovery mode or in readonly mode. In recovery mode the ledger will be fenced and
+     * the writer of the ledger will be prevented from issuing other writes to the ledger. It defaults to 'false'
+     *
+     * @param recovery recovery mode
+     *
+     * @return the builder itself
+     */
+    OpenBuilder withRecovery(boolean recovery);
+
+    /**
+     * Sets the password to be used to open the ledger. It defauls to an empty password
+     *
+     * @param password the password to unlock the ledger
+     *
+     * @return the builder itself
+     */
+    OpenBuilder withPassword(byte[] password);
+
+    /**
+     * Sets the expected digest type used to check the contents of the ledger. It defaults to {@link DigestType#CRC32}.
+     * If {@link ClientConfiguration#setEnableDigestTypeAutodetection(boolean) } is set to true this value is ignored
+     * and the digest type is read directly from metadata
+     *
+     * @param digestType the type of digest
+     *
+     * @return the builder itself
+     */
+    OpenBuilder withDigestType(DigestType digestType);
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
new file mode 100644
index 0000000..4c1df51
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Provide read access to a ledger.
+ *
+ * @since 4.6
+ */
+public interface ReadHandle extends Handle {
+
+    /**
+     * Read a sequence of entries asynchronously.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence
+     * @param lastEntry
+     *          id of last entry of sequence, inclusive
+     * @return an handle to the result of the operation
+     */
+    CompletableFuture<Iterable<LedgerEntry>> read(long firstEntry, long lastEntry);
+
+    /**
+     * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
+     * <br>This is the same of
+     * {@link #read(long, long) }
+     * but it lets the client read without checking the local value of LastAddConfirmed, so that it is possibile to
+     * read entries for which the writer has not received the acknowledge yet. <br>
+     * For entries which are within the range 0..LastAddConfirmed BookKeeper guarantees that the writer has successfully
+     * received the acknowledge.<br>
+     * For entries outside that range it is possible that the writer never received the acknowledge
+     * and so there is the risk that the reader is seeing entries before the writer and this could
+     * result in a consistency issue in some cases.<br>
+     * With this method you can even read entries before the LastAddConfirmed and entries after it with one call,
+     * the expected consistency will be as described above for each subrange of ids.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence
+     * @param lastEntry
+     *          id of last entry of sequence, inclusive
+     * @return an handle to the result of the operation
+     *
+     * @see #read(long, long)
+     * @see #readLastAddConfirmed()
+     */
+    CompletableFuture<Iterable<LedgerEntry>> readUnconfirmed(long firstEntry, long lastEntry);
+
+    /**
+     * Obtains asynchronously the last confirmed write from a quorum of bookies. This
+     * call obtains the the last add confirmed each bookie has received for this ledger
+     * and returns the maximum. If the ledger has been closed, the value returned by this
+     * call may not correspond to the id of the last entry of the ledger, since it reads
+     * the hint of bookies. Consequently, in the case the ledger has been closed, it may
+     * return a different value than getLastAddConfirmed, which returns the local value
+     * of the ledger handle.
+     *
+     * @return an handle to the result of the operation
+     * @see #getLastAddConfirmed()
+     *
+     */
+    CompletableFuture<Long> readLastAddConfirmed();
+
+    /**
+     * Obtains asynchronously the last confirmed write from a quorum of bookies
+     * but it doesn't wait all the responses from the quorum. It would callback
+     * immediately if it received a LAC which is larger than current LAC.
+     *
+     * @return an handle to the result of the operation
+     * @see #tryReadLastAddConfirmed()
+     *
+     */
+    CompletableFuture<Long> tryReadLastAddConfirmed();
+
+    /**
+     * Get the local value for LastAddConfirmed.
+     *
+     * @return the local value for LastAddConfirmed
+     */
+    long getLastAddConfirmed();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
new file mode 100644
index 0000000..87ba498
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Provide write access to a ledger. Using WriteAdvHandler the writer MUST explictly set an entryId. Beware that the
+ * write for a given entryId will be acknowledged if and only if all entries up to entryId - 1 have been acknowledged
+ * too (expected from entryId 0)
+ *
+ * @see WriteHandle
+ *
+ * @since 4.6
+ */
+public interface WriteAdvHandle extends ReadHandle {
+
+    /**
+     * Add entry asynchronously to an open ledger.
+     *
+     * @param entryId entryId to be added
+     * @param data array of bytes to be written
+     * @return an handle to the result, in case of success it will return the same value of param entryId
+     */
+    default CompletableFuture<Long> write(final long entryId, final ByteBuffer data) {
+        return write(entryId, Unpooled.wrappedBuffer(data));
+    }
+
+    /**
+     * Add entry asynchronously to an open ledger.
+     *
+     * @param entryId entryId to be added
+     * @param data array of bytes to be written
+     * @return an handle to the result, in case of success it will return the same value of param entryId
+     */
+    CompletableFuture<Long> write(final long entryId, final ByteBuf data);
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
new file mode 100644
index 0000000..47e1f9c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Provide write access to a ledger.
+ *
+ * @see WriteAdvHandle
+ *
+ * @since 4.6
+ */
+public interface WriteHandle extends ReadHandle {
+
+    /**
+     * Add entry asynchronously to an open ledger.
+     *
+     * @param data array of bytes to be written
+     * @return an handle to the result, in case of success it will return the id of the newly appended entry
+     */
+    CompletableFuture<Long> append(ByteBuf data);
+
+    /**
+     * Add entry asynchronously to an open ledger.
+     *
+     * @param data array of bytes to be written
+     * @return an handle to the result, in case of success it will return the id of the newly appended entry
+     */
+    default CompletableFuture<Long> append(ByteBuffer data) {
+        return append(Unpooled.wrappedBuffer(data));
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/package-info.java
new file mode 100644
index 0000000..71ad4f1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/package-info.java
@@ -0,0 +1,27 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * BookKeeper Client Public API.
+ *
+ * @since 4.6
+ */
+package org.apache.bookkeeper.client.api;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
new file mode 100644
index 0000000..d9223a4
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import java.io.IOException;
+import org.apache.bookkeeper.client.BKException.ZKException;
+import org.apache.bookkeeper.client.api.BookKeeper;
+import org.apache.bookkeeper.client.api.BookKeeperBuilder;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Internal builder for {@link org.apache.bookkeeper.client.api.BookKeeper} client.
+ *
+ * @since 4.6
+ */
+public class BookKeeperBuilderImpl implements BookKeeperBuilder {
+
+    private final org.apache.bookkeeper.client.BookKeeper.Builder builder;
+
+    public BookKeeperBuilderImpl(ClientConfiguration conf) {
+        this.builder = org.apache.bookkeeper.client.BookKeeper.forConfig(conf);
+    }
+
+    @Override
+    public BookKeeperBuilder eventLoopGroup(EventLoopGroup eventLoopGroup) {
+        builder.eventLoopGroup(eventLoopGroup);
+        return this;
+    }
+
+    @Override
+    public BookKeeperBuilder zk(ZooKeeper zk) {
+        builder.zk(zk);
+        return this;
+    }
+
+    @Override
+    public BookKeeperBuilder statsLogger(StatsLogger statsLogger) {
+        builder.statsLogger(statsLogger);
+        return this;
+    }
+
+    @Override
+    public BookKeeperBuilder dnsResolver(DNSToSwitchMapping dnsResolver) {
+        builder.dnsResolver(dnsResolver);
+        return this;
+    }
+
+    @Override
+    public BookKeeperBuilder requestTimer(HashedWheelTimer requeestTimer) {
+        builder.requestTimer(requeestTimer);
+        return this;
+    }
+
+    @Override
+    public BookKeeperBuilder featureProvider(FeatureProvider featureProvider) {
+        builder.featureProvider(featureProvider);
+        return this;
+    }
+
+    @Override
+    public BookKeeper build() throws InterruptedException, BKException, IOException  {
+        try {
+            return builder.build();
+        } catch (KeeperException err) {
+            ZKException zkErr = new ZKException();
+            zkErr.initCause(err);
+            throw zkErr;
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/package-info.java
new file mode 100644
index 0000000..aa60d53
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/package-info.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/**
+ * BookKeeper Client implementation package
+ *
+ * @since 4.6
+ */
+package org.apache.bookkeeper.client.impl;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index c2c8ad9..fc78d8f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -545,7 +545,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
             ReplicationVerificationCallback cb = new ReplicationVerificationCallback(numRequests);
             for (long i = startEntryId; i < endEntryId; i++) {
                 for (BookieSocketAddress addr : e.getValue()) {
-                    bkc.bookieClient.readEntry(addr, lh.getId(), i, cb, addr);
+                    bkc.getBookieClient().readEntry(addr, lh.getId(), i, cb, addr);
                 }
             }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index b44d9d2..dbfd737 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -214,7 +214,7 @@ public class BookieWriteLedgerTest extends
             CompletableFuture<Object> done = new CompletableFuture<>();
             lh.asyncAddEntry(Unpooled.wrappedBuffer(entry.array()),
                 (int rc, LedgerHandle lh1, long entryId, Object ctx) -> {
-                SynchCallbackUtils.finish(rc, null, done);
+                SyncCallbackUtils.finish(rc, null, done);
             }, null);
             done.get();
         } catch (ExecutionException ee) {
@@ -227,7 +227,7 @@ public class BookieWriteLedgerTest extends
             CompletableFuture<Object> done = new CompletableFuture<>();
             lh.asyncAddEntry(entry.array(),
                 (int rc, LedgerHandle lh1, long entryId, Object ctx) -> {
-                SynchCallbackUtils.finish(rc, null, done);
+                SyncCallbackUtils.finish(rc, null, done);
             }, null);
             done.get();
         } catch (ExecutionException ee) {
@@ -240,7 +240,7 @@ public class BookieWriteLedgerTest extends
             CompletableFuture<Object> done = new CompletableFuture<>();
             lh.asyncAddEntry(entry.array(),0, 4,
                 (int rc, LedgerHandle lh1, long entryId, Object ctx) -> {
-                SynchCallbackUtils.finish(rc, null, done);
+                SyncCallbackUtils.finish(rc, null, done);
             }, null);
             done.get();
         } catch (ExecutionException ee) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
new file mode 100644
index 0000000..6ef8901
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Optional;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.client.api.CreateBuilder;
+import org.apache.bookkeeper.client.api.DeleteBuilder;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.junit.After;
+import org.junit.Before;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.doAnswer;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for Mock-based Client testcases
+ */
+public abstract class MockBookKeeperTestCase {
+
+    private final static Logger LOG = LoggerFactory.getLogger(MockBookKeeperTestCase.class);
+
+    protected ScheduledExecutorService scheduler;
+    protected OrderedSafeExecutor executor;
+    protected BookKeeper bk;
+    protected BookieClient bookieClient;
+    protected LedgerManager ledgerManager;
+    protected LedgerIdGenerator ledgerIdGenerator;
+
+    private BookieWatcher bookieWatcher;
+
+    protected ConcurrentMap<Long, LedgerMetadata> mockLedgerMetadataRegistry;
+    protected AtomicLong mockNextLedgerId;
+    protected ConcurrentSkipListSet<Long> fencedLedgers;
+    protected ConcurrentMap<Long, Map<BookieSocketAddress, Map<Long, MockEntry>>> mockLedgerData;
+
+    private Map<BookieSocketAddress, Map<Long, MockEntry>> getMockLedgerContents(long ledgerId) {
+        return mockLedgerData.computeIfAbsent(ledgerId, (id) -> new ConcurrentHashMap<>());
+    }
+
+    private Map<Long, MockEntry> getMockLedgerContentsInBookie(long ledgerId, BookieSocketAddress bookieSocketAddress) {
+        return getMockLedgerContents(ledgerId).computeIfAbsent(bookieSocketAddress, (addr) -> new ConcurrentHashMap<>());
+    }
+
+    private MockEntry getMockLedgerEntry(long ledgerId, BookieSocketAddress bookieSocketAddress, long entryId) {
+        return getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).get(entryId);
+    }
+
+    private static final class MockEntry {
+
+        byte[] payload;
+        long lastAddConfirmed;
+
+        public MockEntry(byte[] payload, long lastAddConfirmed) {
+            this.payload = payload;
+            this.lastAddConfirmed = lastAddConfirmed;
+        }
+
+    }
+
+    @Before
+    public void setup() throws Exception {
+        mockLedgerMetadataRegistry = new ConcurrentHashMap<>();
+        mockLedgerData = new ConcurrentHashMap<>();
+        mockNextLedgerId = new AtomicLong(1);
+        fencedLedgers = new ConcurrentSkipListSet<>();
+        scheduler = new ScheduledThreadPoolExecutor(4);
+        executor = OrderedSafeExecutor.newBuilder().build();
+        bookieWatcher = mock(BookieWatcher.class);
+
+        bookieClient = mock(BookieClient.class);
+        ledgerManager = mock(LedgerManager.class);
+        ledgerIdGenerator = mock(LedgerIdGenerator.class);
+
+        bk = mock(BookKeeper.class);
+
+        NullStatsLogger nullStatsLogger = setupLoggers();
+
+        when(bk.getCloseLock()).thenReturn(new ReentrantReadWriteLock());
+        when(bk.isClosed()).thenReturn(false);
+        when(bk.getBookieWatcher()).thenReturn(bookieWatcher);
+        when(bk.getExplicitLacInterval()).thenReturn(0);
+        when(bk.getMainWorkerPool()).thenReturn(executor);
+        when(bk.getBookieClient()).thenReturn(bookieClient);
+        when(bk.getScheduler()).thenReturn(scheduler);
+        when(bk.getReadSpeculativeRequestPolicy()).thenReturn(Optional.absent());
+        when(bk.getConf()).thenReturn(new ClientConfiguration());
+        when(bk.getStatsLogger()).thenReturn(nullStatsLogger);
+        when(bk.getLedgerManager()).thenReturn(ledgerManager);
+        when(bk.getLedgerIdGenerator()).thenReturn(ledgerIdGenerator);
+
+        setupLedgerIdGenerator();
+
+        setupCreateLedgerMetadata();
+        setupReadLedgerMetadata();
+        setupWriteLedgerMetadata();
+        setupRemoveLedgerMetadata();
+        setupRegisterLedgerMetadataListener();
+        setupBookieWatcherForNewEnsemble();
+        setupBookieClientReadEntry();
+        setupBookieClientAddEntry();
+    }
+
+    protected NullStatsLogger setupLoggers() {
+        NullStatsLogger nullStatsLogger = new NullStatsLogger();
+        when(bk.getOpenOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getRecoverOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getAddOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getReadOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getDeleteOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getCreateOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getRecoverAddCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getRecoverReadCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        return nullStatsLogger;
+    }
+
+    @After
+    public void tearDown() {
+        scheduler.shutdown();
+        executor.shutdown();
+    }
+
+    protected void setBookkeeperConfig(ClientConfiguration config) {
+        when(bk.getConf()).thenReturn(config);
+    }
+
+    protected CreateBuilder newCreateLedgerOp() {
+        return new LedgerCreateOp.CreateBuilderImpl(bk);
+    }
+
+    protected OpenBuilder newOpenLedgerOp() {
+        return new LedgerOpenOp.OpenBuilderImpl(bk);
+    }
+
+    protected DeleteBuilder newDeleteLedgerOp() {
+        return new LedgerDeleteOp.DeleteBuilderImpl(bk);
+    }
+
+    protected void closeBookkeeper() {
+        when(bk.isClosed()).thenReturn(true);
+    }
+
+    protected BookieSocketAddress generateBookieSocketAddress(int index) {
+        return new BookieSocketAddress("localhost", 1111 + index);
+    }
+
+    protected ArrayList<BookieSocketAddress> generateNewEnsemble(int ensembleSize) {
+        ArrayList<BookieSocketAddress> ensemble = new ArrayList<>(ensembleSize);
+        for (int i = 0; i < ensembleSize; i++) {
+            ensemble.add(generateBookieSocketAddress(i));
+        }
+        return ensemble;
+    }
+
+    private void setupBookieWatcherForNewEnsemble() throws BKException.BKNotEnoughBookiesException {
+        when(bookieWatcher.newEnsemble(anyInt(), anyInt(), anyInt(), any()))
+            .thenAnswer((Answer<ArrayList<BookieSocketAddress>>) new Answer<ArrayList<BookieSocketAddress>>() {
+                @Override
+                @SuppressWarnings("unchecked")
+                public ArrayList<BookieSocketAddress> answer(InvocationOnMock invocation) throws Throwable {
+                    Object[] args = invocation.getArguments();
+                    int ensembleSize = (Integer) args[0];
+                    return generateNewEnsemble(ensembleSize);
+                }
+            });
+    }
+
+    private void submit(Runnable operation) {
+        try {
+            scheduler.submit(operation);
+        } catch (RejectedExecutionException rejected) {
+            operation.run();
+        }
+    }
+
+    protected void registerMockEntryForRead(long ledgerId, long entryId, BookieSocketAddress bookieSocketAddress,
+        byte[] entryData, long lastAddConfirmed) {
+        getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).put(entryId, new MockEntry(entryData, lastAddConfirmed));
+    }
+
+    protected void registerMockLedgerMetadata(long ledgerId, LedgerMetadata ledgerMetadata) {
+        mockLedgerMetadataRegistry.put(ledgerId, ledgerMetadata);
+    }
+
+    protected void setNewGeneratedLedgerId(long ledgerId) {
+        mockNextLedgerId.set(ledgerId);
+        setupLedgerIdGenerator();
+    }
+
+    protected LedgerMetadata getLedgerMetadata(long ledgerId) {
+        return mockLedgerMetadataRegistry.get(ledgerId);
+    }
+
+    private void setupReadLedgerMetadata() {
+        doAnswer((Answer<Void>) new Answer<Void>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                Object[] args = invocation.getArguments();
+                Long ledgerId = (Long) args[0];
+                BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[1];
+                LedgerMetadata ledgerMetadata = mockLedgerMetadataRegistry.get(ledgerId);
+                if (ledgerMetadata == null) {
+                    cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null);
+                } else {
+                    cb.operationComplete(BKException.Code.OK, new LedgerMetadata(ledgerMetadata));
+                }
+                return null;
+            }
+        }).when(ledgerManager).readLedgerMetadata(anyLong(), any());
+    }
+
+    private void setupRemoveLedgerMetadata() {
+        doAnswer((Answer<Void>) new Answer<Void>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                Object[] args = invocation.getArguments();
+                Long ledgerId = (Long) args[0];
+                BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
+                if (mockLedgerMetadataRegistry.remove(ledgerId) != null) {
+                    cb.operationComplete(BKException.Code.OK, null);
+                } else {
+                    cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null);
+                }
+                return null;
+            }
+        }).when(ledgerManager).removeLedgerMetadata(anyLong(), any(), any());
+    }
+
+    private void setupRegisterLedgerMetadataListener() {
+        doAnswer((Answer<Void>) new Answer<Void>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                return null;
+            }
+        }).when(ledgerManager).registerLedgerMetadataListener(anyLong(), any());
+    }
+
+    private void setupLedgerIdGenerator() {
+        Mockito.doAnswer((Answer<Void>) new Answer<Void>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                Object[] args = invocation.getArguments();
+                BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[0];
+                cb.operationComplete(BKException.Code.OK, mockNextLedgerId.getAndIncrement());
+                return null;
+            }
+        }).when(ledgerIdGenerator).generateLedgerId(any());
+    }
+
+    private void setupCreateLedgerMetadata() {
+        doAnswer((Answer<Void>) new Answer<Void>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                Object[] args = invocation.getArguments();
+                BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
+                Long ledgerId = (Long) args[0];
+                LedgerMetadata ledgerMetadata = (LedgerMetadata) args[1];
+                mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(ledgerMetadata));
+                cb.operationComplete(BKException.Code.OK, null);
+                return null;
+            }
+        }).when(ledgerManager).createLedgerMetadata(anyLong(), any(), any());
+    }
+
+    private void setupWriteLedgerMetadata() {
+        doAnswer((Answer<Void>) new Answer<Void>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                Object[] args = invocation.getArguments();
+                Long ledgerId = (Long) args[0];
+                LedgerMetadata metadata = (LedgerMetadata) args[1];
+                BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
+                mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(metadata));
+                cb.operationComplete(BKException.Code.OK, null);
+                return null;
+            }
+        }).when(ledgerManager).writeLedgerMetadata(anyLong(), any(), any());
+    }
+
+    protected void setupBookieClientReadEntry() {
+        doAnswer((Answer) (InvocationOnMock invokation) -> {
+            Object[] args = invokation.getArguments();
+            BookkeeperInternalCallbacks.ReadEntryCallback callback = (BookkeeperInternalCallbacks.ReadEntryCallback) args[4];
+            BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
+            long ledgerId = (Long) args[1];
+            long entryId = (Long) args[3];
+
+            DigestManager macManager = new CRC32DigestManager(ledgerId);
+            fencedLedgers.add(ledgerId);
+            submit(() -> {
+                MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+                if (mockEntry != null) {
+                    LOG.info("readEntryAndFenceLedger - found mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    ByteBuf entry = macManager.computeDigestAndPackageForSending(entryId, mockEntry.lastAddConfirmed,
+                        mockEntry.payload.length, Unpooled.wrappedBuffer(mockEntry.payload));
+                    callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, Unpooled.copiedBuffer(entry), args[5]);
+                    entry.release();
+                } else {
+                    LOG.info("readEntryAndFenceLedger - no such mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, entryId, null, args[5]);
+                }
+            });
+            return null;
+        }).when(bookieClient).readEntryAndFenceLedger(any(), anyLong(), any(), anyLong(),
+            any(BookkeeperInternalCallbacks.ReadEntryCallback.class), any());
+
+        doAnswer((Answer) (InvocationOnMock invokation) -> {
+            Object[] args = invokation.getArguments();
+            BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
+            long ledgerId = (Long) args[1];
+            long entryId = (Long) args[2];
+            BookkeeperInternalCallbacks.ReadEntryCallback callback = (BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
+
+            DigestManager macManager = new CRC32DigestManager(ledgerId);
+
+            submit(() -> {
+                MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+                if (mockEntry != null) {
+                    LOG.info("readEntry - found mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    ByteBuf entry = macManager.computeDigestAndPackageForSending(entryId,
+                        mockEntry.lastAddConfirmed, mockEntry.payload.length, Unpooled.wrappedBuffer(mockEntry.payload));
+                    callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, Unpooled.copiedBuffer(entry), args[4]);
+                    entry.release();
+                } else {
+                    LOG.info("readEntry - no such mock entry {}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, entryId, null, args[4]);
+                }
+            });
+            return null;
+        }).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+            any(BookkeeperInternalCallbacks.ReadEntryCallback.class), any());
+    }
+
+    private static byte[] extractEntryPayload(long ledgerId, long entryId, ByteBuf toSend) throws BKException.BKDigestMatchException {
+        ByteBuf toSendCopy = Unpooled.copiedBuffer(toSend);
+        toSendCopy.resetReaderIndex();
+        DigestManager macManager = new CRC32DigestManager(ledgerId);
+        ByteBuf content = macManager.verifyDigestAndReturnData(entryId, toSendCopy);
+        byte[] entry = new byte[content.readableBytes()];
+        content.readBytes(entry);
+        content.resetReaderIndex();
+        content.release();
+        return entry;
+    }
+
+    protected void setupBookieClientAddEntry() {
+        doAnswer((Answer) (InvocationOnMock invokation) -> {
+            Object[] args = invokation.getArguments();
+            BookkeeperInternalCallbacks.WriteCallback callback = (BookkeeperInternalCallbacks.WriteCallback) args[5];
+            BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
+            long ledgerId = (Long) args[1];
+            long entryId = (Long) args[3];
+            ByteBuf toSend = (ByteBuf) args[4];
+            Object ctx = args[6];
+
+            byte[] entry = extractEntryPayload(ledgerId, entryId, toSend);
+
+            submit(() -> {
+                boolean fenced = fencedLedgers.contains(ledgerId);
+                if (fenced) {
+                    callback.writeComplete(BKException.Code.LedgerFencedException,
+                        ledgerId, entryId, bookieSocketAddress, ctx);
+                } else {
+                    if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) {
+                        registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress,
+                            new byte[0], BookieProtocol.INVALID_ENTRY_ID);
+                    }
+                    registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId);
+                    callback.writeComplete(BKException.Code.OK, ledgerId, entryId, bookieSocketAddress, ctx);
+                }
+            });
+            return null;
+        }).when(bookieClient).addEntry(any(BookieSocketAddress.class),
+            anyLong(), any(byte[].class),
+            anyLong(), any(ByteBuf.class),
+            any(BookkeeperInternalCallbacks.WriteCallback.class),
+            any(), anyInt());
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 1b6f2e7..a2a671a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -361,7 +361,7 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
         final CountDownLatch addLatch = new CountDownLatch(1);
         final AtomicBoolean addSuccess = new AtomicBoolean(false);
         LOG.info("Add entry {} with lac = {}", entryId, lac);
-        lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(0), lh.getId(), lh.ledgerKey, entryId, toSend,
+        lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(0), lh.getId(), lh.ledgerKey, entryId, toSend,
             new WriteCallback() {
                 @Override
                 public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index 0e4d72d..614dd5e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -96,7 +96,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
             ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
             VerificationCallback callback = new VerificationCallback(addresses.size());
             for (BookieSocketAddress addr : addresses) {
-                bkc.bookieClient.readEntry(addr, lh.getId(), eid, callback, addr);
+                bkc.getBookieClient().readEntry(addr, lh.getId(), eid, callback, addr);
             }
             callback.latch.await();
             assertEquals(expectedSuccess, callback.numSuccess.get());
@@ -113,7 +113,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
             ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
             VerificationCallback callback = new VerificationCallback(addresses.size());
             for (BookieSocketAddress addr : addresses) {
-                bkc.bookieClient.readEntry(addr, lh.getId(), eid, callback, addr);
+                bkc.getBookieClient().readEntry(addr, lh.getId(), eid, callback, addr);
             }
             callback.latch.await();
             assertTrue(expectedSuccess >= callback.numSuccess.get());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
new file mode 100644
index 0000000..e878249
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.BKException.BKDuplicateEntryIdException;
+import org.apache.bookkeeper.client.BKException.BKLedgerFencedException;
+import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
+import org.apache.bookkeeper.client.BKException.BKUnauthorizedAccessException;
+import org.apache.bookkeeper.client.MockBookKeeperTestCase;
+import org.junit.Test;
+
+/**
+ * Unit tests of classes in this package
+ */
+public class BookKeeperApiTest extends MockBookKeeperTestCase {
+
+    final static byte[] data = "foo".getBytes(UTF_8);
+    final static byte[] password = "password".getBytes(UTF_8);
+
+    @Test
+    public void testWriteHandle() throws Exception {
+        try (WriteHandle writer
+            = result(newCreateLedgerOp()
+                .withAckQuorumSize(1)
+                .withWriteQuorumSize(2)
+                .withEnsembleSize(3)
+                .withPassword(password)
+                .execute());) {
+
+            // test writer is able to write
+            result(writer.append(ByteBuffer.wrap(data)));
+            result(writer.append(Unpooled.wrappedBuffer(data)));
+            long expectedEntryId = result(writer.append(ByteBuffer.wrap(data)));
+            assertEquals(expectedEntryId, writer.getLastAddConfirmed());
+        }
+    }
+
+    @Test
+    public void testWriteAdvHandle() throws Exception {
+        long ledgerId = 12345;
+        setNewGeneratedLedgerId(ledgerId);
+        try (WriteAdvHandle writer
+            = result(newCreateLedgerOp()
+                .withAckQuorumSize(1)
+                .withWriteQuorumSize(2)
+                .withEnsembleSize(3)
+                .withPassword(password)
+                .makeAdv()
+                .execute());) {
+            assertEquals(ledgerId, writer.getId());
+
+            // test writer is able to write
+            long entryId = 0;
+            result(writer.write(entryId++, ByteBuffer.wrap(data)));
+            result(writer.write(entryId++, Unpooled.wrappedBuffer(data)));
+            long expectedEntryId = result(writer.write(entryId++, ByteBuffer.wrap(data)));
+            assertEquals(expectedEntryId, writer.getLastAddConfirmed());
+        }
+    }
+
+    @Test
+    public void testWriteAdvHandleWithFixedLedgerId() throws Exception {
+        setNewGeneratedLedgerId(12345);
+        try (WriteAdvHandle writer
+            = result(newCreateLedgerOp()
+                .withAckQuorumSize(1)
+                .withWriteQuorumSize(2)
+                .withEnsembleSize(3)
+                .withPassword(password)
+                .makeAdv()
+                .withLedgerId(1234)
+                .execute());) {
+            assertEquals(1234, writer.getId());
+
+            // test writer is able to write
+            long entryId = 0;
+            result(writer.write(entryId++, ByteBuffer.wrap(data)));
+            result(writer.write(entryId++, Unpooled.wrappedBuffer(data)));
+            long expectedEntryId = result(writer.write(entryId++, ByteBuffer.wrap(data)));
+            assertEquals(expectedEntryId, writer.getLastAddConfirmed());
+        }
+    }
+
+    @Test(expected = BKDuplicateEntryIdException.class)
+    public void testWriteAdvHandleBKDuplicateEntryId() throws Exception {
+        try (WriteAdvHandle writer
+            = result(newCreateLedgerOp()
+                .withAckQuorumSize(1)
+                .withWriteQuorumSize(2)
+                .withEnsembleSize(3)
+                .withPassword(password)
+                .makeAdv()
+                .withLedgerId(1234)
+                .execute());) {
+            assertEquals(1234, writer.getId());
+            long entryId = 0;
+            result(writer.write(entryId++, ByteBuffer.wrap(data)));
+            result(writer.write(entryId - 1, ByteBuffer.wrap(data)));
+        }
+    }
+
+    @Test(expected = BKUnauthorizedAccessException.class)
+    public void testOpenLedgerUnauthorized() throws Exception {
+        long lId;
+        try (WriteHandle writer
+            = result(newCreateLedgerOp()
+                .withAckQuorumSize(1)
+                .withWriteQuorumSize(2)
+                .withEnsembleSize(3)
+                .withPassword(password)
+                .execute());) {
+            lId = writer.getId();
+        }
+        try (ReadHandle reader
+            = result(newOpenLedgerOp()
+                .withPassword("bad-password".getBytes(UTF_8))
+                .withLedgerId(lId)
+                .execute())) {
+        }
+    }
+
+    @Test(expected = BKDigestMatchException.class)
+    public void testOpenLedgerDigestUnmatched() throws Exception {
+        long lId;
+        try (WriteHandle writer
+            = result(newCreateLedgerOp()
+                .withAckQuorumSize(1)
+                .withWriteQuorumSize(2)
+                .withEnsembleSize(3)
+                .withDigestType(DigestType.MAC)
+                .withPassword(password)
+                .execute());) {
+            lId = writer.getId();
+        }
+        try (ReadHandle reader = result(newOpenLedgerOp()
+            .withDigestType(DigestType.CRC32)
+            .withPassword(password)
+            .withLedgerId(lId)
+            .execute())) {
+        }
+    }
+
+    @Test
+    public void testOpenLedgerRead() throws Exception {
+        long lId;
+        try (WriteHandle writer
+            = result(newCreateLedgerOp()
+                .withAckQuorumSize(1)
+                .withWriteQuorumSize(2)
+                .withEnsembleSize(3)
+                .withPassword(password)
+                .execute());) {
+            lId = writer.getId();
+            // write data and populate LastAddConfirmed
+            result(writer.append(ByteBuffer.wrap(data)));
+            result(writer.append(ByteBuffer.wrap(data)));
+            result(writer.append(ByteBuffer.wrap(data)));
+        }
+
+        try (ReadHandle reader = result(newOpenLedgerOp()
+            .withPassword(password)
+            .withRecovery(false)
+            .withLedgerId(lId)
+            .execute())) {
+            assertEquals(2, reader.getLastAddConfirmed());
+            assertEquals(2, result(reader.readLastAddConfirmed()).intValue());
+            assertEquals(2, result(reader.tryReadLastAddConfirmed()).intValue());
+            checkEntries(result(reader.read(0, reader.getLastAddConfirmed())), data);
+            checkEntries(result(reader.readUnconfirmed(0, reader.getLastAddConfirmed())), data);
+        }
+    }
+
+    @Test(expected = BKLedgerFencedException.class)
+    public void testOpenLedgerWithRecovery() throws Exception {
+        long lId;
+        try (WriteHandle writer = result(newCreateLedgerOp()
+            .withAckQuorumSize(1)
+            .withWriteQuorumSize(2)
+            .withEnsembleSize(3)
+            .withPassword(password)
+            .execute());) {
+            lId = writer.getId();
+
+            result(writer.append(ByteBuffer.wrap(data)));
+            result(writer.append(ByteBuffer.wrap(data)));
+
+            // open with fencing
+            try (ReadHandle reader = result(newOpenLedgerOp()
+                .withPassword(password)
+                .withRecovery(true)
+                .withLedgerId(lId)
+                .execute())) {
+            }
+
+            result(writer.append(ByteBuffer.wrap(data)));
+
+        }
+    }
+
+    @Test(expected = BKNoSuchLedgerExistsException.class)
+    public void testDeleteLedger() throws Exception {
+        long lId;
+
+        try (WriteHandle writer = result(newCreateLedgerOp()
+            .withPassword(password)
+            .execute());) {
+            lId = writer.getId();
+        }
+
+        result(newDeleteLedgerOp().withLedgerId(lId).execute());
+
+        result(newOpenLedgerOp()
+            .withPassword(password)
+            .withLedgerId(lId)
+            .execute());
+    }
+
+    @Test(expected = BKNoSuchLedgerExistsException.class)
+    public void testCannotDeleteLedgerTwice() throws Exception {
+        long lId;
+
+        try (WriteHandle writer = result(newCreateLedgerOp()
+            .withPassword(password)
+            .execute());) {
+            lId = writer.getId();
+        }
+        result(newDeleteLedgerOp().withLedgerId(lId).execute());
+        result(newDeleteLedgerOp().withLedgerId(lId).execute());
+    }
+
+    private static void checkEntries(Iterable<LedgerEntry> entries, byte[] data)
+        throws InterruptedException, BKException {
+        for (LedgerEntry le : entries) {
+            assertArrayEquals(data, le.getEntry());
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
new file mode 100644
index 0000000..946ee4e
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -0,0 +1,318 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.bookkeeper.client.BKException.BKClientClosedException;
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
+import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.MockBookKeeperTestCase;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
+
+import org.junit.Test;
+
+/**
+ * Unit tests of builders
+ */
+public class BookKeeperBuildersTest extends MockBookKeeperTestCase {
+
+    private final static int ensembleSize = 3;
+    private final static int writeQuorumSize = 2;
+    private final static int ackQuorumSize = 1;
+    private final static long ledgerId = 12342L;
+    private final static Map<String, byte[]> customMetadata = new HashMap<>();
+    private final static byte[] password = new byte[3];
+    private final static byte[] entryData = new byte[32];
+
+    @Test
+    public void testCreateLedger() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        WriteHandle writer = newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(ensembleSize)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .withPassword(password)
+            .execute()
+            .get();
+        assertEquals(ledgerId, writer.getId());
+        LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+        assertEquals(ensembleSize, metadata.getEnsembleSize());
+        assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+        assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+        assertArrayEquals(password, metadata.getPassword());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailEnsembleSize0() throws Exception {
+        result(newCreateLedgerOp()
+            .withEnsembleSize(0)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailWriteQuorumSize0() throws Exception {
+        result(newCreateLedgerOp()
+            .withEnsembleSize(2)
+            .withWriteQuorumSize(0)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailAckQuorumSize0() throws Exception {
+        result(newCreateLedgerOp()
+            .withEnsembleSize(2)
+            .withWriteQuorumSize(1)
+            .withAckQuorumSize(0)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailWriteQuorumSizeGreaterThanEnsembleSize() throws Exception {
+        result(newCreateLedgerOp()
+            .withEnsembleSize(1)
+            .withWriteQuorumSize(2)
+            .withAckQuorumSize(1)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailAckQuorumSizeGreaterThanWriteQuorumSize() throws Exception {
+        result(newCreateLedgerOp()
+            .withEnsembleSize(1)
+            .withWriteQuorumSize(1)
+            .withAckQuorumSize(2)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailNoPassword() throws Exception {
+        result(newCreateLedgerOp()
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailPasswordNull() throws Exception {
+        result(newCreateLedgerOp()
+            .withPassword(null)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailCustomMetadataNull() throws Exception {
+        result(newCreateLedgerOp()
+            .withCustomMetadata(null)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailDigestTypeNullAndAutodetectionTrue() throws Exception {
+        ClientConfiguration config = new ClientConfiguration();
+        config.setEnableDigestTypeAutodetection(true);
+        setBookkeeperConfig(config);
+        result(newCreateLedgerOp()
+            .withDigestType(null)
+            .withPassword(password)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailDigestTypeNullAndAutodetectionFalse() throws Exception {
+        ClientConfiguration config = new ClientConfiguration();
+        config.setEnableDigestTypeAutodetection(false);
+        setBookkeeperConfig(config);
+        result(newCreateLedgerOp()
+            .withDigestType(null)
+            .withPassword(password)
+            .execute());
+        fail("shoud not be able to create a ledger with such specs");
+    }
+
+    @Test(expected = BKClientClosedException.class)
+    public void testFailDigestTypeNullAndBookkKeeperClosed() throws Exception {
+        closeBookkeeper();
+        result(newCreateLedgerOp()
+            .withPassword(password)
+            .execute());
+        fail("shoud not be able to create a ledger, client is closed");
+    }
+
+    @Test
+    public void testCreateAdvLedger() throws Exception {
+        setNewGeneratedLedgerId(ledgerId);
+        WriteAdvHandle writer = newCreateLedgerOp()
+            .withAckQuorumSize(ackQuorumSize)
+            .withEnsembleSize(ensembleSize)
+            .withPassword(password)
+            .withWriteQuorumSize(writeQuorumSize)
+            .withCustomMetadata(customMetadata)
+            .makeAdv()
+            .execute()
+            .get();
+        assertEquals(ledgerId, writer.getId());
+        LedgerMetadata metadata = getLedgerMetadata(ledgerId);
+        assertEquals(ensembleSize, metadata.getEnsembleSize());
+        assertEquals(ackQuorumSize, metadata.getAckQuorumSize());
+        assertEquals(writeQuorumSize, metadata.getWriteQuorumSize());
+        assertArrayEquals(password, metadata.getPassword());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailCreateAdvLedgerBadFixedLedgerIdMinus1() throws Exception {
+        result(newCreateLedgerOp()
+            .withPassword(password)
+            .makeAdv()
+            .withLedgerId(-1)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testFailCreateAdvLedgerBadFixedLedgerIdNegative() throws Exception {
+        result(newCreateLedgerOp()
+            .withPassword(password)
+            .makeAdv()
+            .withLedgerId(-2)
+            .execute());
+        fail("shoud not be able to create a ledger with such specs");
+    }
+
+    @Test(expected = BKNoSuchLedgerExistsException.class)
+    public void testOpenLedgerNoId() throws Exception {
+        result(newOpenLedgerOp().execute());
+    }
+
+    @Test(expected = BKNoSuchLedgerExistsException.class)
+    public void testOpenLedgerBadId() throws Exception {
+        result(newOpenLedgerOp()
+            .withPassword(password)
+            .withLedgerId(ledgerId)
+            .execute());
+    }
+
+    @Test(expected = BKClientClosedException.class)
+    public void testOpenLedgerClientClosed() throws Exception {
+        closeBookkeeper();
+        result(newOpenLedgerOp()
+            .withPassword(password)
+            .withLedgerId(ledgerId)
+            .execute());
+    }
+
+    @Test
+    public void testOpenLedgerNoRecovery() throws Exception {
+        LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize,
+            writeQuorumSize, ackQuorumSize, password, customMetadata);
+        registerMockLedgerMetadata(ledgerId, ledgerMetadata);
+
+        ledgerMetadata.getEnsembles().values().forEach(bookieAddressList -> {
+            bookieAddressList.forEach(bookieAddress -> {
+                registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
+                registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1);
+            });
+        });
+
+        result(newOpenLedgerOp()
+            .withPassword(ledgerMetadata.getPassword())
+            .withDigestType(DigestType.CRC32)
+            .withLedgerId(ledgerId)
+            .withRecovery(true)
+            .execute());
+    }
+
+    @Test
+    public void testOpenLedgerRecovery() throws Exception {
+        LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize,
+            writeQuorumSize, ackQuorumSize, password, customMetadata);
+        registerMockLedgerMetadata(ledgerId, ledgerMetadata);
+
+        ledgerMetadata.getEnsembles().values().forEach(bookieAddressList -> {
+            bookieAddressList.forEach(bookieAddress -> {
+                registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
+                registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1);
+            });
+        });
+        result(newOpenLedgerOp()
+            .withPassword(ledgerMetadata.getPassword())
+            .withDigestType(DigestType.CRC32)
+            .withLedgerId(ledgerId)
+            .withRecovery(false)
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testDeleteLedgerNoLedgerId() throws Exception {
+        result(newDeleteLedgerOp()
+            .execute());
+    }
+
+    @Test(expected = BKIncorrectParameterException.class)
+    public void testDeleteLedgerBadLedgerId() throws Exception {
+        result(newDeleteLedgerOp()
+            .withLedgerId(-1)
+            .execute());
+    }
+
+    @Test
+    public void testDeleteLedger() throws Exception {
+        LedgerMetadata ledgerMetadata = generateLedgerMetadata(ensembleSize,
+            writeQuorumSize, ackQuorumSize, password, customMetadata);
+        registerMockLedgerMetadata(ledgerId, ledgerMetadata);
+
+        result(newDeleteLedgerOp()
+            .withLedgerId(ledgerId)
+            .execute());
+    }
+
+    @Test(expected = BKClientClosedException.class)
+    public void testDeleteLedgerBookKeeperClosed() throws Exception {
+        closeBookkeeper();
+        result(newDeleteLedgerOp()
+            .withLedgerId(ledgerId)
+            .execute());
+    }
+
+    protected LedgerMetadata generateLedgerMetadata(int ensembleSize,
+        int writeQuorumSize, int ackQuorumSize, byte[] password,
+        Map<String, byte[]> customMetadata) {
+        LedgerMetadata ledgerMetadata = new LedgerMetadata(ensembleSize, writeQuorumSize,
+            ackQuorumSize, BookKeeper.DigestType.CRC32, password, customMetadata);
+        ledgerMetadata.addEnsemble(0, generateNewEnsemble(ensembleSize));
+        return ledgerMetadata;
+    }
+
+}
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 1925cde..e239e67 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -47,6 +47,14 @@
     <Bug pattern="EI_EXPOSE_REP" />
   </Match>
   <Match>
+    <Class name="org.apache.bookkeeper.client.LedgerEntry" />
+    <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+  </Match>
+  <Match>
+    <Class name="org.apache.bookkeeper.client.BookKeeper" />
+    <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+  </Match>
+  <Match>
     <Class name="org.apache.bookkeeper.common.concurrent.FutureUtils"/>
     <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
   </Match>
@@ -71,4 +79,12 @@
     <Bug category="MT_CORRECTNESS"/>
     <Class name="~org.apache.bookkeeper.util.collections\.[^.]+"/>
   </And>
+  <And>
+    <Class name="org.apache.bookkeeper.client.BKException" />
+    <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
+  </And>
+  <And>
+    <Class name="org.apache.bookkeeper.client.BKException$Code" />
+    <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
+  </And>
 </FindBugsFilter>
diff --git a/buildtools/src/main/resources/bookkeeper/server-suppressions.xml b/buildtools/src/main/resources/bookkeeper/server-suppressions.xml
index 470aff7..d5c14b2 100644
--- a/buildtools/src/main/resources/bookkeeper/server-suppressions.xml
+++ b/buildtools/src/main/resources/bookkeeper/server-suppressions.xml
@@ -22,7 +22,7 @@
     <suppress checks="JavadocPackage" files=".*[\\/]examples[\\/].*"/>
     <!-- suppress packages by packages -->
     <suppress checks=".*" files=".*[\\/]bookie[\\/].*"/>
-    <suppress checks=".*" files=".*[\\/]client[\\/].*"/>
+    <suppress checks=".*" files=".*[\\/]client[\\/](?:[^\\/]+$|(?!api)|(?!impl)[^\\/]+[\\/])"/>
     <suppress checks=".*" files=".*[\\/]conf[\\/].*"/>
     <suppress checks=".*" files=".*[\\/]feature[\\/].*"/>
     <suppress checks=".*" files=".*[\\/]http[\\/].*"/>
diff --git a/pom.xml b/pom.xml
index 45e7eb5..7a4dddc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,8 +112,9 @@
     <netty-boringssl.version>2.0.3.Final</netty-boringssl.version>
     <slf4j.version>1.7.25</slf4j.version>
     <zookeeper.version>3.5.3-beta</zookeeper.version>
+    <findbugs-jsr305.version>3.0.0</findbugs-jsr305.version>
     <!-- plugin dependencies -->
-    <findbugs-maven-plugin.version>3.0.4</findbugs-maven-plugin.version>
+    <findbugs-maven-plugin.version>3.0.5</findbugs-maven-plugin.version>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
     <maven-checkstyle-plugin.version>2.17</maven-checkstyle-plugin.version>
     <maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>
@@ -226,7 +227,7 @@
         <configuration>
           <!-- Prevent missing javadoc comments from being marked as errors -->
           <additionalparam>-Xdoclint:none -notimestamp</additionalparam>
-          <subpackages>org.apache.bookkeeper.client:org.apache.bookkeeper.common.annotation:org.apache.bookkeeper.conf:org.apache.bookkeeper.feature:org.apache.bookkeeper.stats</subpackages>
+          <subpackages>org.apache.bookkeeper.client:org.apache.bookkeeper.client.api:org.apache.bookkeeper.common.annotation:org.apache.bookkeeper.conf:org.apache.bookkeeper.feature:org.apache.bookkeeper.stats</subpackages>
           <groups>
             <group>
               <title>Bookkeeper Client</title>

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message