bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [bookkeeper] branch master updated: BOOKKEEPER-759: Delay Ensemble Change & Disable Ensemble Change
Date Thu, 29 Jun 2017 22:12:47 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e399df  BOOKKEEPER-759: Delay Ensemble Change & Disable Ensemble Change
5e399df is described below

commit 5e399df67c2aa1e5f228c62ba8533ca3293ab147
Author: Sijie Guo <sijieg@twitter.com>
AuthorDate: Thu Jun 29 15:12:40 2017 -0700

    BOOKKEEPER-759: Delay Ensemble Change & Disable Ensemble Change
    
    Descriptions of the changes in this PR:
    
    This pull request contains the changes around ensemble change.
    
    - Delay Ensemble Change: Provide a flag to allow delay ensemble change. if that is set to change, will not do ensemble change until it breaks ack quorum requirements.
    - Disable Ensemble Change: Provide a runtime feature flag to allow disabling ensemble change. The ensemble change behavior can be disabled on-the-fly via the FeatureProvider.
    
    ---
    Be sure to do all of the following to help us incorporate your contribution
    quickly and easily:
    
    - [x] Make sure the PR title is formatted like:
        `<Issue #>: Description of pull request`
        `e.g. Issue 123: Description ...`
    - [x] Make sure tests pass via `mvn clean apache-rat:check install findbugs:check`.
    - [x] Replace `<Issue #>` in the title with the actual Issue number, if there is one.
    
    ---
    
    Author: Sijie Guo <sijieg@twitter.com>
    Author: Sijie Guo <sijie@apache.org>
    
    Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Matteo Merli <mmerli@apache.org>
    
    This closes #202 from sijie/client_changes/delay_ensemble_changes
---
 .../org/apache/bookkeeper/client/BKException.java  |   1 +
 .../org/apache/bookkeeper/client/BookKeeper.java   |  12 +-
 .../bookkeeper/client/DistributionSchedule.java    |  43 ++-
 .../org/apache/bookkeeper/client/LedgerHandle.java | 292 ++++++++++-----
 .../org/apache/bookkeeper/client/PendingAddOp.java |  78 +++-
 .../apache/bookkeeper/client/PendingReadLacOp.java |   6 +-
 .../bookkeeper/client/PendingWriteLacOp.java       |   2 +-
 .../bookkeeper/client/ReadLastConfirmedOp.java     |   5 +-
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    |  20 +-
 .../client/RoundRobinDistributionSchedule.java     |  54 ++-
 .../bookkeeper/conf/ClientConfiguration.java       |  52 +++
 .../org/apache/bookkeeper/proto/BookieClient.java  |  85 +++--
 .../bookkeeper/proto/PerChannelBookieClient.java   |   3 +
 .../bookkeeper/util/BookKeeperConstants.java       |   1 +
 .../bookkeeper/client/BookieRecoveryTest.java      |  29 +-
 .../client/RoundRobinDistributionScheduleTest.java |  12 +-
 .../bookkeeper/client/TestDelayEnsembleChange.java | 417 +++++++++++++++++++++
 .../client/TestDisableEnsembleChange.java          | 277 ++++++++++++++
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  35 +-
 .../src/test/resources/log4j.properties            |   7 +-
 20 files changed, 1238 insertions(+), 193 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index aa3ec08..e17d8b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -114,6 +114,7 @@ public abstract class BKException extends Exception {
      *
      */
     public interface Code {
+        int UNINITIALIZED = 1;
         int OK = 0;
         int ReadException = -1;
         int QuorumException = -2;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 2d509d4..383fe3f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -46,6 +46,7 @@ import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.meta.CleanupLedgerManager;
@@ -119,7 +120,10 @@ public class BookKeeper implements AutoCloseable {
     final HashedWheelTimer requestTimer;
     final boolean ownTimer;
     final FeatureProvider featureProvider;
-    ScheduledExecutorService bookieInfoScheduler;
+    final ScheduledExecutorService bookieInfoScheduler;
+
+    // Features
+    final Feature disableEnsembleChangeFeature;
 
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManagerFactory ledgerManagerFactory;
@@ -132,6 +136,7 @@ public class BookKeeper implements AutoCloseable {
 
     final ClientConfiguration conf;
     final int explicitLacInterval;
+    final boolean delayEnsembleChange;
 
     final Optional<SpeculativeRequestExecutionPolicy> readSpeculativeRequestPolicy;
 
@@ -295,6 +300,7 @@ public class BookKeeper implements AutoCloseable {
                        FeatureProvider featureProvider)
             throws IOException, InterruptedException, KeeperException {
         this.conf = conf;
+        this.delayEnsembleChange = conf.getDelayEnsembleChange();
 
         // initialize zookeeper client
         if (zkc == null) {
@@ -340,6 +346,9 @@ public class BookKeeper implements AutoCloseable {
         } else {
             this.featureProvider = featureProvider;
         }
+        
+        // get features
+        this.disableEnsembleChangeFeature = this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());
 
         // initialize scheduler
         ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
@@ -387,6 +396,7 @@ public class BookKeeper implements AutoCloseable {
             this.bookieInfoReader.start();
         } else {
             LOG.info("Weighted ledger placement is not enabled");
+            this.bookieInfoScheduler = null;
             this.bookieInfoReader = new BookieInfoReader(this, conf, null);
             this.bookieWatcher.readBookiesBlocking();
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index 6dba0cc..c0d78e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -17,7 +17,11 @@
  */
 package org.apache.bookkeeper.client;
 
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
 import java.util.List;
+import java.util.Map;
+
 /**
  * This interface determins how entries are distributed among bookies.
  *
@@ -45,13 +49,32 @@ interface DistributionSchedule {
          * Add a bookie response and check if quorum has been met
          * @return true if quorum has been met, false otherwise
          */
-        public boolean addBookieAndCheck(int bookieIndexHeardFrom);
+        public boolean completeBookieAndCheck(int bookieIndexHeardFrom);
+
+        /**
+         * Received failure response from a bookie and check if ack quorum
+         * will be broken.
+         *
+         * @param bookieIndexHeardFrom
+         *          bookie index that failed.
+         * @param address
+         *          bookie address
+         * @return true if ack quorum is broken, false otherwise.
+         */
+        public boolean failBookieAndCheck(int bookieIndexHeardFrom, BookieSocketAddress address);
+
+        /**
+         * Return the list of bookies that already failed.
+         *
+         * @return the list of bookies that already failed.
+         */
+        public Map<Integer, BookieSocketAddress> getFailedBookies();
 
         /**
          * Invalidate a previous bookie response.
          * Used for reissuing write requests.
          */
-        public void removeBookie(int bookie);
+        public boolean removeBookieAndCheck(int bookie);
     }
 
     /**
@@ -66,19 +89,25 @@ interface DistributionSchedule {
      */
     public interface QuorumCoverageSet {
         /**
-         * Add a bookie to the set, and check if all quorum in the set
-         * have had the action performed for it.
+         * Add a bookie to the result set
+         *
          * @param bookieIndexHeardFrom Bookie we've just heard from
+         */
+        void addBookie(int bookieIndexHeardFrom, int rc);
+
+        /**
+         * check if all quorum in the set have had the action performed for it.
+         *
          * @return whether all quorums have been covered
          */
-        public boolean addBookieAndCheckCovered(int bookieIndexHeardFrom);
+        boolean checkCovered();
     }
 
     public QuorumCoverageSet getCoverageSet();
-    
+
     /**
      * Whether entry presents on given bookie index
-     * 
+     *
      * @param entryId
      *            - entryId to check the presence on given bookie index
      * @param bookieIndex
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 91700b6..9a96078 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -20,9 +20,8 @@
  */
 package org.apache.bookkeeper.client;
 
-import static com.google.common.base.Charsets.UTF_8;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
@@ -89,6 +88,7 @@ public class LedgerHandle implements AutoCloseable {
     final static public long INVALID_ENTRY_ID = BookieProtocol.INVALID_ENTRY_ID;
 
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
+    final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
     Queue<PendingAddOp> pendingAddOps;
     ExplicitLacFlushPolicy explicitLacFlushPolicy;
 
@@ -183,10 +183,14 @@ public class LedgerHandle implements AutoCloseable {
      *
      * @return the last confirmed entry id or {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} if no entry has been confirmed
      */
-    public long getLastAddConfirmed() {
+    public synchronized long getLastAddConfirmed() {
         return lastAddConfirmed;
     }
 
+    synchronized void setLastAddConfirmed(long lac) {
+        this.lastAddConfirmed = lac;
+    }
+
     /**
      * Get the entry id of the last entry that has been enqueued for addition (but
      * may not have possibly been persited to the ledger)
@@ -425,8 +429,8 @@ public class LedgerHandle implements AutoCloseable {
                                 @Override
                                 public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
                                     if (newrc != BKException.Code.OK) {
-                                        LOG.error("Error reading new metadata from ledger " + ledgerId
-                                                  + " when closing, code=" + newrc);
+                                        LOG.error("Error reading new metadata from ledger {} when closing, code={}",
+                                                ledgerId, newrc);
                                         cb.closeComplete(rc, LedgerHandle.this, ctx);
                                     } else {
                                         metadata.setState(prevState);
@@ -442,13 +446,13 @@ public class LedgerHandle implements AutoCloseable {
                                             metadata.setEnsembles(newMeta.getEnsembles());
                                             metadata.setVersion(newMeta.version);
                                             metadata.setLength(length);
-                                            metadata.close(lastAddConfirmed);
+                                            metadata.close(getLastAddConfirmed());
                                             writeLedgerConfig(new CloseCb());
                                             return;
                                         } else {
                                             metadata.setLength(length);
-                                            metadata.close(lastAddConfirmed);
-                                            LOG.warn("Conditional update ledger metadata for ledger " + ledgerId + " failed.");
+                                            metadata.close(getLastAddConfirmed());
+                                            LOG.warn("Conditional update ledger metadata for ledger {} failed.", ledgerId);
                                             cb.closeComplete(rc, LedgerHandle.this, ctx);
                                         }
                                     }
@@ -460,7 +464,7 @@ public class LedgerHandle implements AutoCloseable {
                                 }
                             });
                         } else if (rc != BKException.Code.OK) {
-                            LOG.error("Error update ledger metadata for ledger " + ledgerId + " : " + rc);
+                            LOG.error("Error update ledger metadata for ledger {} : {}", ledgerId, rc);
                             cb.closeComplete(rc, LedgerHandle.this, ctx);
                         } else {
                             cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
@@ -1184,70 +1188,89 @@ public class LedgerHandle implements AutoCloseable {
 
     }
 
-    ArrayList<BookieSocketAddress> replaceBookieInMetadata(final BookieSocketAddress addr, final int bookieIndex)
+    EnsembleInfo replaceBookieInMetadata(final Map<Integer, BookieSocketAddress> failedBookies,
+                                         int ensembleChangeIdx)
             throws BKException.BKNotEnoughBookiesException {
-        BookieSocketAddress newBookie;
-        LOG.info("Handling failure of bookie: {} index: {}", addr, bookieIndex);
         final ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>();
-        final long newEnsembleStartEntry = lastAddConfirmed + 1;
-
-        // avoid parallel ensemble changes to same ensemble.
+        final long newEnsembleStartEntry = getLastAddConfirmed() + 1;
+        final HashSet<Integer> replacedBookies = new HashSet<Integer>();
         synchronized (metadata) {
             newEnsemble.addAll(metadata.currentEnsemble);
-            newBookie = bk.bookieWatcher.replaceBookie(metadata.getEnsembleSize(),
-                    metadata.getWriteQuorumSize(),
-                    metadata.getAckQuorumSize(),
-                    metadata.getCustomMetadata(),
-                    newEnsemble,
-                    bookieIndex, new HashSet<>(Arrays.asList(addr)));
-
-
-            newEnsemble.set(bookieIndex, newBookie);
-
+            for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) {
+                int idx = entry.getKey();
+                BookieSocketAddress addr = entry.getValue();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[EnsembleChange-L{}-{}] : replacing bookie: {} index: {}",
+                        new Object[]{getId(), ensembleChangeIdx, addr, idx});
+                }
+                if (!newEnsemble.get(idx).equals(addr)) {
+                    // ensemble has already changed, failure of this addr is immaterial
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Write did not succeed to {}, bookieIndex {}, but we have already fixed it.",
+                                  addr, idx);
+                    }
+                    continue;
+                }
+                try {
+                    BookieSocketAddress newBookie = bk.bookieWatcher.replaceBookie(
+                        metadata.getEnsembleSize(),
+                        metadata.getWriteQuorumSize(),
+                        metadata.getAckQuorumSize(),
+                        metadata.getCustomMetadata(),
+                        newEnsemble,
+                        idx,
+                        new HashSet<BookieSocketAddress>(failedBookies.values()));
+                    newEnsemble.set(idx, newBookie);
+                    replacedBookies.add(idx);
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    // if there is no bookie replaced, we throw not enough bookie exception
+                    if (replacedBookies.size() <= 0) {
+                        throw e;
+                    } else {
+                        break;
+                    }
+                }
+            }
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Changing ensemble from: " + metadata.currentEnsemble
-                        + " to: " + newEnsemble + " for ledger: " + ledgerId
-                        + " starting at entry: " + (lastAddConfirmed + 1));
+                LOG.debug("[EnsembleChange-L{}-{}] : changing ensemble from: {} to: {} starting at entry: {}," +
+                    " failed bookies: {}, replaced bookies: {}",
+                      new Object[] { ledgerId, ensembleChangeIdx, metadata.currentEnsemble, newEnsemble,
+                              (getLastAddConfirmed() + 1), failedBookies, replacedBookies });
             }
-
             metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
         }
-        return newEnsemble;
+        return new EnsembleInfo(newEnsemble, failedBookies, replacedBookies);
     }
 
-    void handleBookieFailure(final BookieSocketAddress addr, final int bookieIndex) {
-        // If this is the first failure,
-        // try to submit completed pendingAddOps before this failure.
-        if (0 == blockAddCompletions.get()) {
-            sendAddSuccessCallbacks();
+    void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
+        int curBlockAddCompletions = blockAddCompletions.incrementAndGet();
+
+        if (bk.disableEnsembleChangeFeature.isAvailable()) {
+            blockAddCompletions.decrementAndGet();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.",
+                    failedBookies, ledgerId);
+            }
+            unsetSuccessAndSendWriteRequest(failedBookies.keySet());
+            return;
         }
 
-        blockAddCompletions.incrementAndGet();
+        int curNumEnsembleChanges = numEnsembleChanges.incrementAndGet();
 
         synchronized (metadata) {
-            if (!metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
-                // ensemble has already changed, failure of this addr is immaterial
-                LOG.warn("Write did not succeed to {}, bookieIndex {}, but we have already fixed it.",
-                         addr, bookieIndex);
-                blockAddCompletions.decrementAndGet();
-
-                // Try to submit completed pendingAddOps, pending by this fix.
-                if (0 == blockAddCompletions.get()) {
-                    sendAddSuccessCallbacks();
-                }
-
-                return;
-            }
-
             try {
-                ArrayList<BookieSocketAddress> newEnsemble = replaceBookieInMetadata(addr, bookieIndex);
-
-                EnsembleInfo ensembleInfo = new EnsembleInfo(newEnsemble, bookieIndex,
-                                                             addr);
-                writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo));
+                EnsembleInfo ensembleInfo = replaceBookieInMetadata(failedBookies, curNumEnsembleChanges);
+                if (ensembleInfo.replacedBookies.isEmpty()) {
+                    blockAddCompletions.decrementAndGet();
+                    return;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[EnsembleChange-L{}-{}] : writing new ensemble info = {}, block add completions = {}",
+                        new Object[]{getId(), curNumEnsembleChanges, ensembleInfo, curBlockAddCompletions});
+                }
+                writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo, curBlockAddCompletions, curNumEnsembleChanges));
             } catch (BKException.BKNotEnoughBookiesException e) {
-                LOG.error("Could not get additional bookie to "
-                          + "remake ensemble, closing ledger: " + ledgerId);
+                LOG.error("Could not get additional bookie to remake ensemble, closing ledger: {}", ledgerId);
                 handleUnrecoverableErrorDuringAdd(e.getCode());
                 return;
             }
@@ -1255,16 +1278,26 @@ public class LedgerHandle implements AutoCloseable {
     }
 
     // Contains newly reformed ensemble, bookieIndex, failedBookieAddress
-    private static final class EnsembleInfo {
+    static final class EnsembleInfo {
         private final ArrayList<BookieSocketAddress> newEnsemble;
-        private final int bookieIndex;
-        private final BookieSocketAddress addr;
+        private final Map<Integer, BookieSocketAddress> failedBookies;
+        final Set<Integer> replacedBookies;
 
-        public EnsembleInfo(ArrayList<BookieSocketAddress> newEnsemble, int bookieIndex,
-                            BookieSocketAddress addr) {
+        public EnsembleInfo(ArrayList<BookieSocketAddress> newEnsemble,
+                            Map<Integer, BookieSocketAddress> failedBookies,
+                            Set<Integer> replacedBookies) {
             this.newEnsemble = newEnsemble;
-            this.bookieIndex = bookieIndex;
-            this.addr = addr;
+            this.failedBookies = failedBookies;
+            this.replacedBookies = replacedBookies;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("Ensemble Info : failed bookies = ").append(failedBookies)
+                    .append(", replaced bookies = ").append(replacedBookies)
+                    .append(", new ensemble = ").append(newEnsemble);
+            return sb.toString();
         }
     }
 
@@ -1275,10 +1308,16 @@ public class LedgerHandle implements AutoCloseable {
      */
     private final class ChangeEnsembleCb extends OrderedSafeGenericCallback<Void> {
         private final EnsembleInfo ensembleInfo;
+        private final int curBlockAddCompletions;
+        private final int ensembleChangeIdx;
 
-        ChangeEnsembleCb(EnsembleInfo ensembleInfo) {
+        ChangeEnsembleCb(EnsembleInfo ensembleInfo,
+                         int curBlockAddCompletions,
+                         int ensembleChangeIdx) {
             super(bk.mainWorkerPool, ledgerId);
             this.ensembleInfo = ensembleInfo;
+            this.curBlockAddCompletions = curBlockAddCompletions;
+            this.ensembleChangeIdx = ensembleChangeIdx;
         }
 
         @Override
@@ -1287,30 +1326,39 @@ public class LedgerHandle implements AutoCloseable {
                 // We changed the ensemble, but got a version exception. We
                 // should still consider this as an ensemble change
                 ensembleChangeCounter.inc();
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.info("[EnsembleChange-L{}-{}] : encountered version conflicts, re-read ledger metadata.",
+                        getId(), ensembleChangeIdx);
+                }
+
                 rereadMetadata(new ReReadLedgerMetadataCb(rc,
-                                       ensembleInfo));
+                                       ensembleInfo, curBlockAddCompletions, ensembleChangeIdx));
                 return;
             } else if (rc != BKException.Code.OK) {
-                LOG.error("Could not persist ledger metadata while "
-                          + "changing ensemble to: "
-                          + ensembleInfo.newEnsemble
-                          + " , closing ledger");
+                LOG.error("[EnsembleChange-L{}-{}] : could not persist ledger metadata : info = {}, closing ledger : {}.",
+                        new Object[] { getId(), ensembleChangeIdx, ensembleInfo, rc });
                 handleUnrecoverableErrorDuringAdd(rc);
                 return;
             }
-            blockAddCompletions.decrementAndGet();
+            int newBlockAddCompletions = blockAddCompletions.decrementAndGet();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.info("[EnsembleChange-L{}-{}] : completed ensemble change, block add completion {} => {}",
+                    new Object[]{getId(), ensembleChangeIdx, curBlockAddCompletions, newBlockAddCompletions});
+            }
 
             // We've successfully changed an ensemble
             ensembleChangeCounter.inc();
             // the failed bookie has been replaced
-            unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
+            unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
         }
 
         @Override
         public String toString() {
             return String.format("ChangeEnsemble(%d)", ledgerId);
         }
-    };
+    }
 
     /**
      * Callback which is reading the ledgerMetadata present in zk. This will try
@@ -1319,29 +1367,32 @@ public class LedgerHandle implements AutoCloseable {
     private final class ReReadLedgerMetadataCb extends OrderedSafeGenericCallback<LedgerMetadata> {
         private final int rc;
         private final EnsembleInfo ensembleInfo;
+        private final int curBlockAddCompletions;
+        private final int ensembleChangeIdx;
 
-        ReReadLedgerMetadataCb(int rc, EnsembleInfo ensembleInfo) {
+        ReReadLedgerMetadataCb(int rc,
+                               EnsembleInfo ensembleInfo,
+                               int curBlockAddCompletions,
+                               int ensembleChangeIdx) {
             super(bk.mainWorkerPool, ledgerId);
             this.rc = rc;
             this.ensembleInfo = ensembleInfo;
+            this.curBlockAddCompletions = curBlockAddCompletions;
+            this.ensembleChangeIdx = ensembleChangeIdx;
         }
 
         @Override
         public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
             if (newrc != BKException.Code.OK) {
-                LOG.error("Error reading new metadata from ledger "
-                        + "after changing ensemble, code=" + newrc);
+                LOG.error("[EnsembleChange-L{}-{}] : error re-reading metadata to address ensemble change conflicts," +
+                        " code=", new Object[] { ledgerId, ensembleChangeIdx, newrc });
                 handleUnrecoverableErrorDuringAdd(rc);
             } else {
                 if (!resolveConflict(newMeta)) {
-                    LOG.error("Could not resolve ledger metadata conflict "
-                            + "while changing ensemble to: "
-                            + ensembleInfo.newEnsemble
-                            + ", old meta data is \n"
-                            + new String(metadata.serialize(), UTF_8)
-                            + "\n, new meta data is \n"
-                            + new String(newMeta.serialize(), UTF_8)
-                            + "\n ,closing ledger");
+                    LOG.error("[EnsembleChange-L{}-{}] : could not resolve ledger metadata conflict" +
+                            " while changing ensemble to: {}, local meta data is \n {} \n," +
+                            " zk meta data is \n {} \n, closing ledger",
+                            new Object[] { ledgerId, ensembleChangeIdx, ensembleInfo.newEnsemble, metadata, newMeta });
                     handleUnrecoverableErrorDuringAdd(rc);
                 }
             }
@@ -1362,8 +1413,17 @@ public class LedgerHandle implements AutoCloseable {
          * </p>
          */
         private boolean resolveConflict(LedgerMetadata newMeta) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts - local metadata = \n {} \n," +
+                    " zk metadata = \n {} \n", new Object[]{ledgerId, ensembleChangeIdx, metadata, newMeta});
+            }
             // make sure the ledger isn't closed by other ones.
             if (metadata.getState() != newMeta.getState()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.info("[EnsembleChange-L{}-{}] : resolving conflicts but state changed," +
+                            " local metadata = \n {} \n, zk metadata = \n {} \n",
+                        new Object[]{ledgerId, ensembleChangeIdx, metadata, newMeta});
+                }
                 return false;
             }
 
@@ -1375,6 +1435,11 @@ public class LedgerHandle implements AutoCloseable {
             //           than the metadata changed by recovery.
             int diff = newMeta.getEnsembles().size() - metadata.getEnsembles().size();
             if (0 != diff) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts but ensembles have {} differences," +
+                            " local metadata = \n {} \n, zk metadata = \n {} \n",
+                        new Object[]{ledgerId, ensembleChangeIdx, diff, metadata, newMeta});
+                }
                 if (-1 == diff) {
                     // Case 1: metadata is changed by other ones (e.g. Recovery)
                     return updateMetadataIfPossible(newMeta);
@@ -1389,45 +1454,76 @@ public class LedgerHandle implements AutoCloseable {
             // the ensemble change of the failed bookie is failed due to metadata conflicts. so try to
             // update the ensemble change metadata again. Otherwise, it means that the ensemble change
             // is already succeed, unset the success and re-adding entries.
-            if (newMeta.currentEnsemble.get(ensembleInfo.bookieIndex).equals(
-                    ensembleInfo.addr)) {
+            if (!areFailedBookiesReplaced(newMeta, ensembleInfo)) {
                 // If the in-memory data doesn't contains the failed bookie, it means the ensemble change
                 // didn't finish, so try to resolve conflicts with the metadata read from zookeeper and
                 // update ensemble changed metadata again.
-                if (!metadata.currentEnsemble.get(ensembleInfo.bookieIndex)
-                        .equals(ensembleInfo.addr)) {
+                if (areFailedBookiesReplaced(metadata, ensembleInfo)) {
                     return updateMetadataIfPossible(newMeta);
                 }
             } else {
                 ensembleChangeCounter.inc();
                 // We've successfully changed an ensemble
                 // the failed bookie has been replaced
-                blockAddCompletions.decrementAndGet();
-                unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
+                int newBlockAddCompletions = blockAddCompletions.decrementAndGet();
+                unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
+                if (LOG.isDebugEnabled()) {
+                    LOG.info("[EnsembleChange-L{}-{}] : resolved conflicts, block add complectiosn {} => {}.",
+                        new Object[]{ledgerId, ensembleChangeIdx, curBlockAddCompletions, newBlockAddCompletions});
+                }
             }
             return true;
         }
 
+        /**
+         * Check whether all the failed bookies are replaced.
+         *
+         * @param newMeta
+         *          new ledger metadata
+         * @param ensembleInfo
+         *          ensemble info used for ensemble change.
+         * @return true if all failed bookies are replaced, false otherwise
+         */
+        private boolean areFailedBookiesReplaced(LedgerMetadata newMeta, EnsembleInfo ensembleInfo) {
+            boolean replaced = true;
+            for (Integer replacedBookieIdx : ensembleInfo.replacedBookies) {
+                BookieSocketAddress failedBookieAddr = ensembleInfo.failedBookies.get(replacedBookieIdx);
+                BookieSocketAddress replacedBookieAddr = newMeta.currentEnsemble.get(replacedBookieIdx);
+                replaced &= !Objects.equal(replacedBookieAddr, failedBookieAddr);
+            }
+            return replaced;
+        }
+
         private boolean updateMetadataIfPossible(LedgerMetadata newMeta) {
             // if the local metadata is newer than zookeeper metadata, it means that metadata is updated
             // again when it was trying re-reading the metatada, re-kick the reread again
             if (metadata.isNewerThan(newMeta)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[EnsembleChange-L{}-{}] : reread metadata because local metadata is newer.",
+                        new Object[]{ledgerId, ensembleChangeIdx});
+                }
                 rereadMetadata(this);
                 return true;
             }
             // make sure the metadata doesn't changed by other ones.
             if (metadata.isConflictWith(newMeta)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[EnsembleChange-L{}-{}] : metadata is conflicted, local metadata = \n {} \n," +
+                        " zk metadata = \n {} \n", new Object[]{ledgerId, ensembleChangeIdx, metadata, newMeta});
+                }
                 return false;
             }
-            LOG.info("Resolve ledger metadata conflict while changing ensemble to: {},"
-                    + " old meta data is \n {} \n, new meta data is \n {}.", new Object[] {
-                    ensembleInfo.newEnsemble, metadata, newMeta });
+            if (LOG.isDebugEnabled()) {
+                LOG.info("[EnsembleChange-L{}-{}] : resolved ledger metadata conflict and writing to zookeeper,"
+                        + " local meta data is \n {} \n, zk meta data is \n {}.",
+                    new Object[]{ledgerId, ensembleChangeIdx, metadata, newMeta});
+            }
             // update znode version
             metadata.setVersion(newMeta.getVersion());
             // merge ensemble infos from new meta except last ensemble
             // since they might be modified by recovery tool.
             metadata.mergeEnsembles(newMeta.getEnsembles());
-            writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo));
+            writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo, curBlockAddCompletions, ensembleChangeIdx));
             return true;
         }
 
@@ -1435,11 +1531,13 @@ public class LedgerHandle implements AutoCloseable {
         public String toString() {
             return String.format("ReReadLedgerMetadata(%d)", ledgerId);
         }
-    };
+    }
 
-    void unsetSuccessAndSendWriteRequest(final int bookieIndex) {
+    void unsetSuccessAndSendWriteRequest(final Set<Integer> bookies) {
         for (PendingAddOp pendingAddOp : pendingAddOps) {
-            pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
+            for (Integer bookieIndex: bookies) {
+                pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
+            }
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 8c86885..130e1e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -23,9 +23,11 @@ import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -36,7 +38,6 @@ import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * This represents a pending add operation. When it has got success from all
@@ -168,8 +169,10 @@ class PendingAddOp implements WriteCallback, TimerTask {
         // if we had already heard a success from this array index, need to
         // increment our number of responses that are pending, since we are
         // going to unset this success
-        ackSet.removeBookie(bookieIndex);
-        completed = false;
+        if (!ackSet.removeBookieAndCheck(bookieIndex)) {
+            // unset completed if this results in loss of ack quorum
+            completed = false;
+        }
 
         sendWriteRequest(bookieIndex);
     }
@@ -198,7 +201,36 @@ class PendingAddOp implements WriteCallback, TimerTask {
     public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
         int bookieIndex = (Integer) ctx;
 
+        if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
+            // ensemble has already changed, failure of this addr is immaterial
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it.");
+            }
+            return;
+        }
+
+        // must record all acks, even if complete (completion can be undone by an ensemble change)
+        boolean ackQuorum = false;
+        if (BKException.Code.OK == rc) {
+            ackQuorum = ackSet.completeBookieAndCheck(bookieIndex);
+        }
+
         if (completed) {
+            // even the add operation is completed, but because we don't reset completed flag back to false when
+            // #unsetSuccessAndSendWriteRequest doesn't break ack quorum constraint. we still have current pending
+            // add op is completed but never callback. so do a check here to complete again.
+            //
+            // E.g. entry x is going to complete.
+            //
+            // 1) entry x + k hits a failure. lh.handleBookieFailure increases blockAddCompletions to 1, for ensemble change
+            // 2) entry x receives all responses, sets completed to true but fails to send success callback because
+            //    blockAddCompletions is 1
+            // 3) ensemble change completed. lh unset success starting from x to x+k, but since the unset doesn't break ackSet
+            //    constraint. #removeBookieAndCheck doesn't set completed back to false.
+            // 4) so when the retry request on new bookie completes, it finds the pending op is already completed.
+            //    we have to trigger #sendAddSuccessCallbacks
+            //
+            sendAddSuccessCallbacks();
             // I am already finished, ignore incoming responses.
             // otherwise, we might hit the following error handling logic, which might cause bad things.
             return;
@@ -223,29 +255,39 @@ class PendingAddOp implements WriteCallback, TimerTask {
             lh.handleUnrecoverableErrorDuringAdd(rc);
             return;
         default:
-            LOG.warn("Write did not succeed: L{} E{} on {}, rc = {}",
-                     new Object[] { ledgerId, entryId, addr, rc });
-            lh.handleBookieFailure(addr, bookieIndex);
-            return;
-        }
-
-        if (!writeSet.contains(bookieIndex)) {
-            LOG.warn("Received a response for (lid:{}, eid:{}) from {}@{}, but it doesn't belong to {}.",
-                     new Object[] { ledgerId, entryId, addr, bookieIndex, writeSet });
+            if (lh.bk.delayEnsembleChange) {
+                if (ackSet.failBookieAndCheck(bookieIndex, addr) || rc == BKException.Code.WriteOnReadOnlyBookieException) {
+                    Map<Integer, BookieSocketAddress> failedBookies = ackSet.getFailedBookies();
+                    LOG.warn("Failed to write entry ({}, {}) to bookies {}, handling failures.",
+                             new Object[] { ledgerId, entryId, failedBookies });
+                    // we can't meet ack quorum requirement, trigger ensemble change.
+                    lh.handleBookieFailure(failedBookies);
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed to write entry ({}, {}) to bookie ({}, {})," +
+                                  " but it didn't break ack quorum, delaying ensemble change : {}",
+                                  new Object[] { ledgerId, entryId, bookieIndex, addr, BKException.getMessage(rc) });
+                    }
+                }
+            } else {
+                LOG.warn("Failed to write entry ({}, {}): {}",
+                         new Object[] { ledgerId, entryId, BKException.getMessage(rc) });
+                lh.handleBookieFailure(ImmutableMap.of(bookieIndex, addr));
+            }
             return;
         }
 
-        if (ackSet.addBookieAndCheck(bookieIndex) && !completed) {
+        if (ackQuorum && !completed) {
             completed = true;
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Complete (lid:{}, eid:{}).", ledgerId, entryId);
-            }
-            // when completed an entry, try to send success add callbacks in order
-            lh.sendAddSuccessCallbacks();
+            sendAddSuccessCallbacks();
         }
     }
 
+    void sendAddSuccessCallbacks() {
+        lh.sendAddSuccessCallbacks();
+    }
+
     void submitCallback(final int rc) {
         if (null != timeout) {
             timeout.cancel();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index c4e1046..94923e0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -75,6 +75,10 @@ class PendingReadLacOp implements ReadLacCallback {
     public void readLacComplete(int rc, long ledgerId, final ByteBuf lacBuffer, final ByteBuf lastEntryBuffer,
             Object ctx) {
         int bookieIndex = (Integer) ctx;
+
+        // add the response to coverage set
+        coverageSet.addBookie(bookieIndex, rc);
+
         numResponsesPending--;
         boolean heardValidResponse = false;
 
@@ -127,7 +131,7 @@ class PendingReadLacOp implements ReadLacCallback {
 
         // We don't consider a success until we have coverage set responses.
         if (heardValidResponse
-                && coverageSet.addBookieAndCheckCovered(bookieIndex)
+                && coverageSet.checkCovered()
                 && !completed) {
             completed = true;
             if (LOG.isDebugEnabled()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index 45c3898..ea9ee3b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -98,7 +98,7 @@ class PendingWriteLacOp implements WriteLacCallback {
         receivedResponseSet.remove(bookieIndex);
 
         if (rc == BKException.Code.OK) {
-            if (ackSet.addBookieAndCheck(bookieIndex) && !completed) {
+            if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) {
                 completed = true;
                 cb.addLacComplete(rc, lh, ctx);
                 return;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 3d5df84..75264c6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -80,6 +80,9 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
             final ByteBuf buffer, final Object ctx) {
         int bookieIndex = (Integer) ctx;
 
+        // add the response to coverage set
+        coverageSet.addBookie(bookieIndex, rc);
+
         numResponsesPending--;
         boolean heardValidResponse = false;
         if (rc == BKException.Code.OK) {
@@ -116,7 +119,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
 
         // other return codes dont count as valid responses
         if (heardValidResponse
-            && coverageSet.addBookieAndCheckCovered(bookieIndex)
+            && coverageSet.checkCovered()
             && !completed) {
             completed = true;
             if (LOG.isDebugEnabled()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 37227dd..297ea32 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -21,6 +21,8 @@
 package org.apache.bookkeeper.client;
 
 import java.security.GeneralSecurityException;
+import java.util.Map;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -30,8 +32,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataLis
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.versioning.Version;
 
-import java.util.concurrent.RejectedExecutionException;
-
 /**
  * Read only ledger handle. This ledger handle allows you to
  * read from a ledger but not to write to it. It overrides all
@@ -118,25 +118,19 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
     }
 
     @Override
-    void handleBookieFailure(final BookieSocketAddress addr, final int bookieIndex) {
+    void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
         blockAddCompletions.incrementAndGet();
         synchronized (metadata) {
             try {
-                if (!metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
-                    // ensemble has already changed, failure of this addr is immaterial
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Write did not succeed to {}, bookieIndex {},"
-                                +" but we have already fixed it.", addr, bookieIndex);
-                    }
+                EnsembleInfo ensembleInfo = replaceBookieInMetadata(failedBookies,
+                        numEnsembleChanges.incrementAndGet());
+                if (ensembleInfo.replacedBookies.isEmpty()) {
                     blockAddCompletions.decrementAndGet();
                     return;
                 }
-
-                replaceBookieInMetadata(addr, bookieIndex);
-
                 blockAddCompletions.decrementAndGet();
                 // the failed bookie has been replaced
-                unsetSuccessAndSendWriteRequest(bookieIndex);
+                unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
             } catch (BKException.BKNotEnoughBookiesException e) {
                 LOG.error("Could not get additional bookie to "
                           + "remake ensemble, closing ledger: " + ledgerId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index 82f300b..4eaf37b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -17,11 +17,14 @@
  */
 package org.apache.bookkeeper.client;
 
-import org.apache.bookkeeper.util.MathUtils;
+import com.google.common.collect.ImmutableMap;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Map;
 
 /**
  * A specific {@link DistributionSchedule} that places entries in round-robin
@@ -54,40 +57,71 @@ class RoundRobinDistributionSchedule implements DistributionSchedule {
     @Override
     public AckSet getAckSet() {
         final HashSet<Integer> ackSet = new HashSet<Integer>();
+        final HashMap<Integer, BookieSocketAddress> failureMap =
+                new HashMap<Integer, BookieSocketAddress>();
         return new AckSet() {
-            public boolean addBookieAndCheck(int bookieIndexHeardFrom) {
+            public boolean completeBookieAndCheck(int bookieIndexHeardFrom) {
+                failureMap.remove(bookieIndexHeardFrom);
                 ackSet.add(bookieIndexHeardFrom);
                 return ackSet.size() >= ackQuorumSize;
             }
 
-            public void removeBookie(int bookie) {
+            @Override
+            public boolean failBookieAndCheck(int bookieIndexHeardFrom, BookieSocketAddress address) {
+                ackSet.remove(bookieIndexHeardFrom);
+                failureMap.put(bookieIndexHeardFrom, address);
+                return failureMap.size() > (writeQuorumSize - ackQuorumSize);
+            }
+
+            @Override
+            public Map<Integer, BookieSocketAddress> getFailedBookies() {
+                return ImmutableMap.copyOf(failureMap);
+            }
+
+            public boolean removeBookieAndCheck(int bookie) {
                 ackSet.remove(bookie);
+                failureMap.remove(bookie);
+                return ackSet.size() >= ackQuorumSize;
             }
         };
     }
 
     private class RRQuorumCoverageSet implements QuorumCoverageSet {
-        private final boolean[] covered = new boolean[ensembleSize];
+        private final int[] covered = new int[ensembleSize];
 
         private RRQuorumCoverageSet() {
             for (int i = 0; i < covered.length; i++) {
-                covered[i] = false;
+                covered[i] = BKException.Code.UNINITIALIZED;
             }
         }
 
-        public synchronized boolean addBookieAndCheckCovered(int bookieIndexHeardFrom) {
-            covered[bookieIndexHeardFrom] = true;
+        @Override
+        public synchronized void addBookie(int bookieIndexHeardFrom, int rc) {
+            covered[bookieIndexHeardFrom] = rc;
+        }
 
+        @Override
+        public synchronized boolean checkCovered() {
             // now check if there are any write quorums, with |ackQuorum| nodes available
             for (int i = 0; i < ensembleSize; i++) {
                 int nodesNotCovered = 0;
+                int nodesOkay = 0;
+                int nodesUninitialized = 0;
                 for (int j = 0; j < writeQuorumSize; j++) {
                     int nodeIndex = (i + j) % ensembleSize;
-                    if (!covered[nodeIndex]) {
+                    if (covered[nodeIndex] == BKException.Code.OK) {
+                        nodesOkay++;
+                    } else if (covered[nodeIndex] != BKException.Code.NoSuchEntryException &&
+                            covered[nodeIndex] != BKException.Code.NoSuchLedgerExistsException) {
                         nodesNotCovered++;
+                    } else if (covered[nodeIndex] == BKException.Code.UNINITIALIZED) {
+                        nodesUninitialized++;
                     }
                 }
-                if (nodesNotCovered >= ackQuorumSize) {
+                // if we haven't seen any OK responses and there are still nodes not heard from,
+                // let's wait until
+                if (nodesNotCovered >= ackQuorumSize ||
+                        (nodesOkay == 0 && nodesUninitialized > 0)) {
                     return false;
                 }
             }
@@ -99,7 +133,7 @@ class RoundRobinDistributionSchedule implements DistributionSchedule {
     public QuorumCoverageSet getCoverageSet() {
         return new RRQuorumCoverageSet();
     }
-    
+
     @Override
     public boolean hasEntry(long entryId, int bookieIndex) {
         return getWriteSet(entryId).contains(bookieIndex);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 32b21aa..038437a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.conf;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -67,6 +68,8 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String SPECULATIVE_READ_TIMEOUT_BACKOFF_MULTIPLIER = "speculativeReadTimeoutBackoffMultiplier";
     protected final static String ENABLE_PARALLEL_RECOVERY_READ = "enableParallelRecoveryRead";
     protected final static String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize";
+    // Add Parameters
+    protected final static String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange";
     // Timeout Setting
     protected final static String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
     protected final static String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec";
@@ -100,6 +103,9 @@ public class ClientConfiguration extends AbstractConfiguration {
     // Stats
     protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
     protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros";
+    
+    // Names of dynamic features
+    protected final static String DISABLE_ENSEMBLE_CHANGE_FEATURE_NAME = "disableEnsembleChangeFeatureName";
 
     // Role of the client
     protected final static String CLIENT_ROLE = "clientRole";
@@ -1207,4 +1213,50 @@ public class ClientConfiguration extends AbstractConfiguration {
         return getString(CLIENT_ROLE, CLIENT_ROLE_STANDARD);
     }
 
+    /**
+     * Whether to delay ensemble change or not?
+     *
+     * @return true if to delay ensemble change, otherwise false.
+     */
+    public boolean getDelayEnsembleChange() {
+        return getBoolean(DELAY_ENSEMBLE_CHANGE, false);
+    }
+
+    /**
+     * Enable/Disable delaying ensemble change.
+     * <p>
+     * If set to true, ensemble change only happens when it can't meet
+     * ack quorum requirement. If set to false, ensemble change happens
+     * immediately when it received a failed write.
+     * </p>
+     *
+     * @param enabled
+     *          flag to enable/disable delaying ensemble change.
+     * @return client configuration.
+     */
+    public ClientConfiguration setDelayEnsembleChange(boolean enabled) {
+        setProperty(DELAY_ENSEMBLE_CHANGE, enabled);
+        return this;
+    }
+
+    /**
+     * Get the name of the dynamic feature that disables ensemble change
+     *
+     * @return name of the dynamic feature that disables ensemble change
+     */
+    public String getDisableEnsembleChangeFeatureName() {
+        return getString(DISABLE_ENSEMBLE_CHANGE_FEATURE_NAME, FEATURE_DISABLE_ENSEMBLE_CHANGE);
+    }
+
+    /**
+     * Set the name of the dynamic feature that disables ensemble change
+     *
+     * @param disableEnsembleChangeFeatureName
+     *          name of the dynamic feature that disables ensemble change
+     * @return client configuration.
+     */
+    public ClientConfiguration setDisableEnsembleChangeFeatureName(String disableEnsembleChangeFeatureName) {
+        setProperty(DISABLE_ENSEMBLE_CHANGE_FEATURE_NAME, disableEnsembleChangeFeatureName);
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 4fb08e4..b25a3b8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -25,7 +25,6 @@ import static com.google.common.base.Charsets.UTF_8;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -52,7 +51,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ExtensionRegistry;
 
 import io.netty.buffer.ByteBuf;
@@ -210,14 +208,42 @@ public class BookieClient implements PerChannelBookieClientFactory {
         }
     }
 
-    public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
-            final long entryId, final ByteBuf toSend, final WriteCallback cb, final Object ctx, final int options) {
+    private void completeAdd(final int rc,
+                             final long ledgerId,
+                             final long entryId,
+                             final BookieSocketAddress addr,
+                             final WriteCallback cb,
+                             final Object ctx) {
+        try {
+            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                }
+                @Override
+                public String toString() {
+                    return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr);
+                }
+            });
+        } catch (RejectedExecutionException ree) {
+            cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
+        }
+    }
+
+    public void addEntry(final BookieSocketAddress addr,
+                         final long ledgerId,
+                         final byte[] masterKey,
+                         final long entryId,
+                         final ByteBuf toSend,
+                         final WriteCallback cb,
+                         final Object ctx,
+                         final int options) {
         closeLock.readLock().lock();
         try {
             final PerChannelBookieClientPool client = lookupClient(addr, entryId);
             if (client == null) {
-                cb.writeComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                                 ledgerId, entryId, addr, ctx);
+                completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
+                            ledgerId, entryId, addr, cb, ctx);
                 return;
             }
 
@@ -229,17 +255,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                 @Override
                 public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
                     if (rc != BKException.Code.OK) {
-                        try {
-                            executor.submitOrdered(ledgerId, new SafeRunnable() {
-                                @Override
-                                public void safeRun() {
-                                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
-                                }
-                            });
-                        } catch (RejectedExecutionException re) {
-                            cb.writeComplete(getRc(BKException.Code.InterruptedException),
-                                    ledgerId, entryId, addr, ctx);
-                        }
+                        completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
                     } else {
                         pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
                     }
@@ -250,6 +266,25 @@ public class BookieClient implements PerChannelBookieClientFactory {
             closeLock.readLock().unlock();
         }
     }
+    
+    private void completeRead(final int rc,
+                              final long ledgerId,
+                              final long entryId,
+                              final ByteBuf entry,
+                              final ReadEntryCallback cb,
+                              final Object ctx) {
+        try {
+            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
+                }
+            });
+        } catch (RejectedExecutionException ree) {
+            cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
+                                 ledgerId, entryId, entry, ctx);
+        }
+    }
 
     public void readEntryAndFenceLedger(final BookieSocketAddress addr,
                                         final long ledgerId,
@@ -261,8 +296,8 @@ public class BookieClient implements PerChannelBookieClientFactory {
         try {
             final PerChannelBookieClientPool client = lookupClient(addr, entryId);
             if (client == null) {
-                cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                                     ledgerId, entryId, null, ctx);
+                completeRead(getRc(BKException.Code.BookieHandleNotAvailableException),
+                             ledgerId, entryId, null, cb, ctx);
                 return;
             }
 
@@ -270,17 +305,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                 @Override
                 public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
                     if (rc != BKException.Code.OK) {
-                        try {
-                            executor.submitOrdered(ledgerId, new SafeRunnable() {
-                                @Override
-                                public void safeRun() {
-                                    cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
-                                }
-                            });
-                        } catch (RejectedExecutionException re) {
-                            cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
-                                    ledgerId, entryId, null, ctx);
-                        }
+                        completeRead(rc, ledgerId, entryId, null, cb, ctx);
                         return;
                     }
                     pcbc.readEntryAndFenceLedger(ledgerId, masterKey, entryId, cb, ctx);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6c5d68f..3905083 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1775,6 +1775,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             case EFENCED:
                 rcToRet = BKException.Code.LedgerFencedException;
                 break;
+            case EREADONLY:
+                rcToRet = BKException.Code.WriteOnReadOnlyBookieException;
+                break;
             default:
                 break;
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index 8dc7e2d..bae7715 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -52,4 +52,5 @@ public class BookKeeperConstants {
     public static final long MAX_LOG_SIZE_LIMIT = 1 * 1024 * 1024 * 1024;
 
     public static final String FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT = "repp_disable_durability_enforcement";
+    public static final String FEATURE_DISABLE_ENSEMBLE_CHANGE = "disable_ensemble_change";
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index ae0a07f..3b307e1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -29,7 +29,7 @@ import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,7 +57,8 @@ import static org.junit.Assert.*;
 /**
  * This class tests the bookie recovery admin functionality.
  */
-public class BookieRecoveryTest extends MultiLedgerManagerMultiDigestTestCase {
+public class BookieRecoveryTest extends BookKeeperClusterTestCase {
+
     private final static Logger LOG = LoggerFactory.getLogger(BookieRecoveryTest.class);
 
     // Object used for synchronizing async method calls
@@ -93,10 +94,11 @@ public class BookieRecoveryTest extends MultiLedgerManagerMultiDigestTestCase {
     BookKeeperAdmin bkAdmin;
 
     // Constructor
-    public BookieRecoveryTest(String ledgerManagerFactory, DigestType digestType) {
+    public BookieRecoveryTest() {
         super(3);
-        this.digestType = digestType;
-        this.ledgerManagerFactory = ledgerManagerFactory;
+
+        this.digestType = DigestType.CRC32;
+        this.ledgerManagerFactory = "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory";
         LOG.info("Using ledger manager " + ledgerManagerFactory);
         // set ledger manager
         baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
@@ -241,6 +243,23 @@ public class BookieRecoveryTest extends MultiLedgerManagerMultiDigestTestCase {
      */
     @Test(timeout = 60000)
     public void testMetadataConflictWithRecovery() throws Exception {
+        metadataConflictWithRecovery(bkc);
+    }
+
+    @Test(timeout = 60000)
+    public void testMetadataConflictWhenDelayingEnsembleChange() throws Exception {
+        ClientConfiguration newConf = new ClientConfiguration(baseClientConf);
+        newConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        newConf.setDelayEnsembleChange(true);
+        BookKeeper newBkc = new BookKeeper(newConf);
+        try {
+            metadataConflictWithRecovery(newBkc);
+        } finally {
+            newBkc.close();
+        }
+    }
+
+    void metadataConflictWithRecovery(BookKeeper bkc) throws Exception {
         int numEntries = 10;
         byte[] data = "testMetadataConflictWithRecovery".getBytes();
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
index 12e3c17..327b642 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
@@ -43,10 +43,10 @@ public class RoundRobinDistributionScheduleTest {
         assertEquals("Write set is wrong size", wSet.size(), 3);
 
         DistributionSchedule.AckSet ackSet = schedule.getAckSet();
-        assertFalse("Shouldn't ack yet", ackSet.addBookieAndCheck(wSet.get(0)));
-        assertFalse("Shouldn't ack yet", ackSet.addBookieAndCheck(wSet.get(0)));
-        assertTrue("Should ack after 2 unique", ackSet.addBookieAndCheck(wSet.get(2)));
-        assertTrue("Should still be acking", ackSet.addBookieAndCheck(wSet.get(1)));
+        assertFalse("Shouldn't ack yet", ackSet.completeBookieAndCheck(wSet.get(0)));
+        assertFalse("Shouldn't ack yet", ackSet.completeBookieAndCheck(wSet.get(0)));
+        assertTrue("Should ack after 2 unique", ackSet.completeBookieAndCheck(wSet.get(2)));
+        assertTrue("Should still be acking", ackSet.completeBookieAndCheck(wSet.get(1)));
     }
 
     /**
@@ -113,10 +113,10 @@ public class RoundRobinDistributionScheduleTest {
         int errors = 0;
         for (Set<Integer> subset : subsets) {
             DistributionSchedule.QuorumCoverageSet covSet = schedule.getCoverageSet();
-            boolean covSetSays = false;
             for (Integer i : subset) {
-                covSetSays = covSet.addBookieAndCheckCovered(i);
+                covSet.addBookie(i, BKException.Code.OK);
             }
+            boolean covSetSays = covSet.checkCovered();
 
             boolean[] nodesAvailable = buildAvailable(ensemble, subset);
             boolean canGetAck = canGetAckQuorum(ensemble, writeQuorum, ackQuorum, nodesAvailable);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
new file mode 100644
index 0000000..8c8e60b
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -0,0 +1,417 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
+
+    final static Logger logger = LoggerFactory.getLogger(TestDelayEnsembleChange.class);
+
+    final DigestType digestType;
+    final byte[] testPasswd = "".getBytes();
+
+    public TestDelayEnsembleChange() {
+        super(5);
+        this.digestType = DigestType.CRC32;
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        baseClientConf.setDelayEnsembleChange(true);
+        super.setUp();
+    }
+
+    private static class VerificationCallback implements ReadEntryCallback {
+        final CountDownLatch latch;
+        final AtomicLong numSuccess;
+        final AtomicLong numMissing;
+        final AtomicLong numFailure;
+
+        VerificationCallback(int numRequests) {
+            latch = new CountDownLatch(numRequests);
+            numSuccess = new AtomicLong(0L);
+            numMissing = new AtomicLong(0L);
+            numFailure = new AtomicLong(0L);
+        }
+
+        @Override
+        public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
+            if (rc == BKException.Code.OK) {
+                numSuccess.incrementAndGet();
+            } else if (rc == BKException.Code.NoSuchEntryException || rc == BKException.Code.NoSuchLedgerExistsException) {
+                logger.error("Missed entry({}, {}) from host {}.", new Object[] { ledgerId, entryId, ctx });
+                numMissing.incrementAndGet();
+            } else {
+                logger.error("Failed to get entry({}, {}) from host {} : {}",
+                             new Object[] { ledgerId, entryId, ctx, rc });
+                numFailure.incrementAndGet();
+            }
+            latch.countDown();
+        }
+    }
+
+    private void verifyEntries(LedgerHandle lh, long startEntry, long untilEntry,
+                               long expectedSuccess, long expectedMissing) throws Exception {
+        LedgerMetadata md = lh.getLedgerMetadata();
+
+        for (long eid = startEntry; eid < untilEntry; eid++) {
+            ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
+            VerificationCallback callback = new VerificationCallback(addresses.size());
+            for (BookieSocketAddress addr : addresses) {
+                bkc.bookieClient.readEntry(addr, lh.getId(), eid, callback, addr);
+            }
+            callback.latch.await();
+            assertEquals(expectedSuccess, callback.numSuccess.get());
+            assertEquals(expectedMissing, callback.numMissing.get());
+            assertEquals(0, callback.numFailure.get());
+        }
+    }
+
+    private void verifyEntriesRange(LedgerHandle lh, long startEntry, long untilEntry,
+                                    long expectedSuccess, long expectedMissing) throws Exception {
+        LedgerMetadata md = lh.getLedgerMetadata();
+
+        for (long eid = startEntry; eid < untilEntry; eid++) {
+            ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
+            VerificationCallback callback = new VerificationCallback(addresses.size());
+            for (BookieSocketAddress addr : addresses) {
+                bkc.bookieClient.readEntry(addr, lh.getId(), eid, callback, addr);
+            }
+            callback.latch.await();
+            assertTrue(expectedSuccess >= callback.numSuccess.get());
+            assertTrue(expectedMissing <= callback.numMissing.get());
+            assertEquals(0, callback.numFailure.get());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testNotChangeEnsembleIfNotBrokenAckQuorum() throws Exception {
+        LedgerHandle lh = bkc.createLedger(5, 5, 3, digestType, testPasswd);
+
+        byte[] data = "foobar".getBytes();
+
+        int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // kill two bookies, but we still have 3 bookies for the ack quorum.
+        ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
+        ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
+
+        for (int i = numEntries; i < 2 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("There should be no ensemble change if delaying ensemble change is enabled.",
+                     1, lh.getLedgerMetadata().getEnsembles().size());
+
+        bsConfs.add(conf0);
+        bs.add(startBookie(conf0));
+        bsConfs.add(conf1);
+        bs.add(startBookie(conf1));
+
+        for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("There should be no ensemble change if delaying ensemble change is enabled.",
+                     1, lh.getLedgerMetadata().getEnsembles().size());
+
+        // check entries
+        verifyEntries(lh, 0, numEntries, 5, 0);
+        verifyEntries(lh, numEntries, 2 * numEntries, 3, 2);
+        verifyEntries(lh, 2 * numEntries, 3 * numEntries, 5, 0);
+    }
+
+    @Test(timeout = 60000)
+    public void testChangeEnsembleIfBrokenAckQuorum() throws Exception {
+        startNewBookie();
+        startNewBookie();
+        startNewBookie();
+
+        LedgerHandle lh = bkc.createLedger(5, 5, 3, digestType, testPasswd);
+
+        byte[] data = "foobar".getBytes();
+
+        int numEntries = 5;
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        logger.info("Kill bookie 0 and write {} entries.", numEntries);
+
+        // kill two bookies, but we still have 3 bookies for the ack quorum.
+        ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
+
+        for (int i = numEntries; i < 2 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("There should be no ensemble change if delaying ensemble change is enabled.",
+                     1, lh.getLedgerMetadata().getEnsembles().size());
+
+        logger.info("Kill bookie 1 and write another {} entries.", numEntries);
+
+        ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
+
+        for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("There should be no ensemble change if delaying ensemble change is enabled.",
+                     1, lh.getLedgerMetadata().getEnsembles().size());
+
+        logger.info("Kill bookie 2 and write another {} entries.", numEntries);
+
+        ServerConfiguration conf2 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(2));
+
+        for (int i = 3 * numEntries; i < 4 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensemble change should kill in
+        assertEquals("There should be ensemble change if ack quorum couldn't be formed.",
+                     2, lh.getLedgerMetadata().getEnsembles().size());
+
+        ArrayList<BookieSocketAddress> firstFragment = lh.getLedgerMetadata().getEnsemble(0);
+        ArrayList<BookieSocketAddress> secondFragment = lh.getLedgerMetadata().getEnsemble(3 * numEntries);
+        assertFalse(firstFragment.get(0).equals(secondFragment.get(0)));
+        assertFalse(firstFragment.get(1).equals(secondFragment.get(1)));
+        assertFalse(firstFragment.get(2).equals(secondFragment.get(2)));
+        assertEquals(firstFragment.get(3), secondFragment.get(3));
+        assertEquals(firstFragment.get(4), secondFragment.get(4));
+
+        bsConfs.add(conf0);
+        bs.add(startBookie(conf0));
+        bsConfs.add(conf1);
+        bs.add(startBookie(conf1));
+        bsConfs.add(conf2);
+        bs.add(startBookie(conf2));
+
+        for (int i = 4 * numEntries; i < 5 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("There should be no ensemble change if delaying ensemble change is enabled.",
+                     2, lh.getLedgerMetadata().getEnsembles().size());
+
+        // check entries
+        verifyEntries(lh, 0, numEntries, 5, 0);
+        verifyEntries(lh, numEntries, 2 * numEntries, 4, 1);
+        verifyEntries(lh, 2 * numEntries, 3 * numEntries, 3, 2);
+        verifyEntries(lh, 3 * numEntries, 4 * numEntries, 5, 0);
+        verifyEntries(lh, 4 * numEntries, 5 * numEntries, 5, 0);
+    }
+
+    @Test(timeout = 60000)
+    public void testEnsembleChangeWithNotEnoughBookies() throws Exception {
+        startNewBookie();
+
+        LedgerHandle lh = bkc.createLedger(5, 5, 3, digestType, testPasswd);
+
+        byte[] data = "foobar".getBytes();
+
+        int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        logger.info("Killed 3 bookies and add {} more entries : {}", numEntries, lh.getLedgerMetadata());
+
+        // kill three bookies, but we only have 2 new bookies for ensemble change.
+        ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
+        ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
+        ServerConfiguration conf2 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(2));
+
+        for (int i = numEntries; i < 2 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        logger.info("Ledger metadata after killed bookies : {}", lh.getLedgerMetadata());
+
+        // ensure there is ensemble changed
+        assertEquals("There should be ensemble change if ack quorum is broken.",
+                     2, lh.getLedgerMetadata().getEnsembles().size());
+
+        bsConfs.add(conf0);
+        bs.add(startBookie(conf0));
+        bsConfs.add(conf1);
+        bs.add(startBookie(conf1));
+        bsConfs.add(conf2);
+        bs.add(startBookie(conf2));
+
+        for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("There should be no ensemble change after adding failed bookies back.",
+                     2, lh.getLedgerMetadata().getEnsembles().size());
+
+        // check entries
+        verifyEntries(lh, 0, numEntries, 5, 0);
+        verifyEntries(lh, numEntries, 2 * numEntries, 3, 2);
+        verifyEntries(lh, 2 * numEntries, 3 * numEntries, 5, 0);
+    }
+
+    @Test(timeout = 60000)
+    public void testEnsembleChangeWithMoreBookieFailures() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            startNewBookie();
+        }
+
+        LedgerHandle lh = bkc.createLedger(5, 5, 3, digestType, testPasswd);
+
+        byte[] data = "foobar".getBytes();
+
+        int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            logger.info("Add entry {}", i);
+            lh.addEntry(data);
+        }
+
+        logger.info("Killed 5 bookies and add {} more entries : {}", numEntries, lh.getLedgerMetadata());
+
+        // kill 5 bookies to introduce more bookie failure
+        List<ServerConfiguration> confs = new ArrayList<ServerConfiguration>(5);
+        for (int i = 0; i < 5; i++) {
+            confs.add(killBookie(lh.getLedgerMetadata().currentEnsemble.get(i)));
+        }
+
+        for (int i = numEntries; i < 2 * numEntries; i++) {
+            logger.info("Add entry {}", i);
+            lh.addEntry(data);
+        }
+
+        logger.info("Ledger metadata after killed bookies : {}", lh.getLedgerMetadata());
+
+        // ensure there is no ensemble changed
+        assertEquals("There should be ensemble change if breaking ack quorum.",
+                     2, lh.getLedgerMetadata().getEnsembles().size());
+
+        for (ServerConfiguration conf : confs) {
+            bsConfs.add(conf);
+            bs.add(startBookie(conf));
+        }
+
+        for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
+            logger.info("Add entry {}", i);
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("There should not be ensemble changed if delaying ensemble change is enabled.",
+                     2, lh.getLedgerMetadata().getEnsembles().size());
+
+        // check entries
+        verifyEntries(lh, 0, numEntries, 5, 0);
+        verifyEntriesRange(lh, numEntries, 2 * numEntries, 5, 0);
+        verifyEntries(lh, 2 * numEntries, 3 * numEntries, 5, 0);
+    }
+
+    @Test(timeout = 60000)
+    public void testChangeEnsembleIfBookieReadOnly() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, 2, digestType, testPasswd);
+
+        byte[] data = "foobar".getBytes();
+
+        int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // kill two bookies, but we still have 3 bookies for the ack quorum.
+        setBookieToReadOnly(lh.getLedgerMetadata().currentEnsemble.get(0));
+
+        for (int i = numEntries; i < 2 * numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("The ensemble should change when a bookie is readonly even if we delay ensemble change.",
+            2, lh.getLedgerMetadata().getEnsembles().size());
+
+    }
+
+    @Test(timeout = 60000)
+    public void testChangeEnsembleSecondBookieReadOnly() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, 2, digestType, testPasswd);
+
+        byte[] data = "foobar".getBytes();
+
+        int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        BookieSocketAddress failedBookie = lh.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress readOnlyBookie = lh.getLedgerMetadata().currentEnsemble.get(1);
+        ServerConfiguration conf0 = killBookie(failedBookie);
+
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        assertEquals("There should be ensemble change if delaying ensemble change is enabled.",
+            1, lh.getLedgerMetadata().getEnsembles().size());
+
+        // kill two bookies, but we still have 3 bookies for the ack quorum.
+        setBookieToReadOnly(readOnlyBookie);
+
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(data);
+        }
+
+        // ensure there is no ensemble changed
+        assertEquals("The ensemble should change when a bookie is readonly even if we delay ensemble change.",
+            2, lh.getLedgerMetadata().getEnsembles().size());
+        assertEquals(3, lh.getLedgerMetadata().currentEnsemble.size());
+        assertFalse(lh.getLedgerMetadata().currentEnsemble.contains(failedBookie));
+        assertFalse(lh.getLedgerMetadata().currentEnsemble.contains(readOnlyBookie));
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
new file mode 100644
index 0000000..b45d434
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
@@ -0,0 +1,277 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.feature.SettableFeature;
+import org.apache.bookkeeper.feature.SettableFeatureProvider;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.util.BookKeeperConstants.*;
+import static org.junit.Assert.*;
+
+/**
+ * Test Case on Disabling Ensemble Change Feature
+ */
+public class TestDisableEnsembleChange extends BookKeeperClusterTestCase {
+
+    static final Logger logger = LoggerFactory.getLogger(TestDisableEnsembleChange.class);
+
+    public TestDisableEnsembleChange() {
+        super(4);
+    }
+
+    @Test(timeout = 60000)
+    public void testDisableEnsembleChange() throws Exception {
+        disableEnsembleChangeTest(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testDisableEnsembleChangeNotEnoughBookies() throws Exception {
+        disableEnsembleChangeTest(false);
+    }
+
+    void disableEnsembleChangeTest(boolean startNewBookie) throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setZkServers(zkUtil.getZooKeeperConnectString())
+            .setDelayEnsembleChange(false)
+            .setDisableEnsembleChangeFeatureName(FEATURE_DISABLE_ENSEMBLE_CHANGE);
+
+        SettableFeatureProvider featureProvider = new SettableFeatureProvider("test", 0);
+        BookKeeper bkc = BookKeeper.forConfig(conf)
+                .featureProvider(featureProvider)
+                .build();
+
+        SettableFeature disableEnsembleChangeFeature = featureProvider.getFeature(FEATURE_DISABLE_ENSEMBLE_CHANGE);
+        disableEnsembleChangeFeature.set(true);
+
+        final byte[] password = new byte[0];
+        final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, password);
+        final AtomicBoolean finished = new AtomicBoolean(false);
+        final AtomicBoolean failTest = new AtomicBoolean(false);
+        final byte[] entry = "test-disable-ensemble-change".getBytes(UTF_8);
+
+        assertEquals(1, lh.getLedgerMetadata().getEnsembles().size());
+        ArrayList<BookieSocketAddress> ensembleBeforeFailure =
+                new ArrayList<BookieSocketAddress>(lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next().getValue());
+
+        final RateLimiter rateLimiter = RateLimiter.create(10);
+
+        Thread addThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    while (!finished.get()) {
+                        rateLimiter.acquire();
+                        lh.addEntry(entry);
+                    }
+                } catch (Exception e) {
+                    logger.error("Exception on adding entry : ", e);
+                    failTest.set(true);
+                }
+            }
+        };
+        addThread.start();
+        Thread.sleep(2000);
+        killBookie(0);
+        Thread.sleep(2000);
+        finished.set(true);
+        addThread.join();
+
+        assertFalse("Should not fail adding entries facing one bookie failure when disable ensemble change",
+                failTest.get());
+
+        // check the ensemble after failure
+        assertEquals("No new ensemble should be added when disable ensemble change.",
+                1, lh.getLedgerMetadata().getEnsembles().size());
+        ArrayList<BookieSocketAddress> ensembleAfterFailure =
+                new ArrayList<BookieSocketAddress>(lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next().getValue());
+        assertArrayEquals(ensembleBeforeFailure.toArray(new BookieSocketAddress[ensembleBeforeFailure.size()]),
+                ensembleAfterFailure.toArray(new BookieSocketAddress[ensembleAfterFailure.size()]));
+
+        // enable ensemble change
+        disableEnsembleChangeFeature.set(false);
+        if (startNewBookie) {
+            startNewBookie();
+        }
+
+        // reset add thread
+        finished.set(false);
+        final CountDownLatch failLatch = new CountDownLatch(1);
+
+        addThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    while (!finished.get()) {
+                        lh.addEntry(entry);
+                    }
+                } catch (Exception e) {
+                    logger.error("Exception on adding entry : ", e);
+                    failLatch.countDown();
+                    failTest.set(true);
+                }
+            }
+        };
+        addThread.start();
+        failLatch.await(4000, TimeUnit.MILLISECONDS);
+        finished.set(true);
+        addThread.join();
+
+        if (startNewBookie) {
+            assertFalse("Should not fail adding entries when enable ensemble change again.",
+                    failTest.get());
+            assertFalse("Ledger should be closed when enable ensemble change again.",
+                    lh.getLedgerMetadata().isClosed());
+            assertEquals("New ensemble should be added when enable ensemble change again.",
+                    2, lh.getLedgerMetadata().getEnsembles().size());
+        } else {
+            assertTrue("Should fail adding entries when enable ensemble change again.",
+                    failTest.get());
+            assertTrue("Ledger should be closed when enable ensemble change again.",
+                    lh.getLedgerMetadata().isClosed());
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testRetryFailureBookie() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setZkServers(zkUtil.getZooKeeperConnectString())
+            .setDelayEnsembleChange(false)
+            .setDisableEnsembleChangeFeatureName(FEATURE_DISABLE_ENSEMBLE_CHANGE);
+
+        SettableFeatureProvider featureProvider = new SettableFeatureProvider("test", 0);
+        BookKeeper bkc = BookKeeper.forConfig(conf)
+                .featureProvider(featureProvider)
+                .build();
+
+        SettableFeature disableEnsembleChangeFeature = featureProvider.getFeature(FEATURE_DISABLE_ENSEMBLE_CHANGE);
+        disableEnsembleChangeFeature.set(true);
+
+        LedgerHandle lh = bkc.createLedger(4, 4, 4, BookKeeper.DigestType.CRC32, new byte[] {});
+        byte[] entry = "testRetryFailureBookie".getBytes();
+        for (int i=0; i<10; i++) {
+            lh.addEntry(entry);
+        }
+        // kill a bookie
+        ServerConfiguration killedConf = killBookie(0);
+
+        final AtomicInteger res = new AtomicInteger(0xdeadbeef);
+        final CountDownLatch addLatch = new CountDownLatch(1);
+        AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback() {
+            @Override
+                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                    logger.info("Add entry {} completed : rc {}.", entryId, rc);
+                    res.set(rc);
+                    addLatch.countDown();
+                }
+        };
+        lh.asyncAddEntry(entry, cb, null);
+        assertFalse("Add entry operation should not complete.",
+                addLatch.await(1000, TimeUnit.MILLISECONDS));
+        assertEquals(res.get(), 0xdeadbeef);
+        // start the original bookie
+        bsConfs.add(killedConf);
+        bs.add(startBookie(killedConf));
+        assertTrue("Add entry operation should complete at this point.",
+                addLatch.await(1000, TimeUnit.MILLISECONDS));
+        assertEquals(res.get(), BKException.Code.OK);
+    }
+
+    @Test(timeout=20000)
+    public void testRetrySlowBookie() throws Exception {
+        final int readTimeout = 2;
+
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setZkServers(zkUtil.getZooKeeperConnectString())
+            .setReadEntryTimeout(readTimeout)
+            .setAddEntryTimeout(readTimeout)
+            .setDelayEnsembleChange(false)
+            .setDisableEnsembleChangeFeatureName(FEATURE_DISABLE_ENSEMBLE_CHANGE);
+
+
+        SettableFeatureProvider featureProvider = new SettableFeatureProvider("test", 0);
+        BookKeeper bkc = BookKeeper.forConfig(conf)
+                .featureProvider(featureProvider)
+                .build();
+
+        SettableFeature disableEnsembleChangeFeature = featureProvider.getFeature(FEATURE_DISABLE_ENSEMBLE_CHANGE);
+        disableEnsembleChangeFeature.set(true);
+
+        LedgerHandle lh = bkc.createLedger(4, 4, 4, BookKeeper.DigestType.CRC32, new byte[] {});
+        byte[] entry = "testRetryFailureBookie".getBytes();
+        for (int i=0; i<10; i++) {
+            lh.addEntry(entry);
+        }
+
+        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+
+        final CountDownLatch wakeupLatch = new CountDownLatch(1);
+        final CountDownLatch suspendLatch = new CountDownLatch(1);
+        sleepBookie(curEns.get(2), wakeupLatch, suspendLatch);
+
+        suspendLatch.await();
+
+        final AtomicInteger res = new AtomicInteger(0xdeadbeef);
+        final CountDownLatch addLatch = new CountDownLatch(1);
+        AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback() {
+            @Override
+                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                    logger.info("Add entry {} completed : rc {}.", entryId, rc);
+                    res.set(rc);
+                    addLatch.countDown();
+                }
+        };
+        lh.asyncAddEntry(entry, cb, null);
+        assertFalse("Add entry operation should not complete.",
+                addLatch.await(1000, TimeUnit.MILLISECONDS));
+        assertEquals(res.get(), 0xdeadbeef);
+        // wait until read timeout
+        assertFalse("Add entry operation should not complete even timeout.",
+                addLatch.await(readTimeout, TimeUnit.SECONDS));
+        assertEquals(res.get(), 0xdeadbeef);
+        // wait one more read timeout, to ensure we resend multiple retries
+        // to ensure it works correctly
+        assertFalse("Add entry operation should not complete even timeout.",
+                addLatch.await(readTimeout, TimeUnit.SECONDS));
+        assertEquals(res.get(), 0xdeadbeef);
+        // wakeup the sleep bookie
+        wakeupLatch.countDown();
+        assertTrue("Add entry operation should complete at this point.",
+                addLatch.await(1000, TimeUnit.MILLISECONDS));
+        assertEquals(res.get(), BKException.Code.OK);
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index a71dcb5..437c794 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.test;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -203,6 +204,7 @@ public abstract class BookKeeperClusterTestCase {
         ServerConfiguration conf = new ServerConfiguration(baseConf);
         conf.setBookiePort(port);
         conf.setZkServers(zkServers);
+        conf.setAllowLoopback(true);
         conf.setJournalDirName(journalDir.getPath());
         String[] ledgerDirNames = new String[ledgerDirs.length];
         for (int i=0; i<ledgerDirs.length; i++) {
@@ -269,6 +271,23 @@ public abstract class BookKeeperClusterTestCase {
     }
 
     /**
+     * Set the bookie identified by its socket address to readonly
+     *
+     * @param addr
+     *          Socket Address
+     * @return the configuration of killed bookie
+     * @throws InterruptedException
+     */
+    public void setBookieToReadOnly(BookieSocketAddress addr) throws InterruptedException, UnknownHostException {
+        for (BookieServer server : bs) {
+            if (server.getLocalAddress().equals(addr)) {
+                server.getBookie().doTransitionToReadOnlyMode();
+                break;
+            }
+        }
+    }
+
+    /**
      * Kill a bookie by index. Also, stops the respective auto recovery process
      * for this bookie, if isAutoRecoveryEnabled is true.
      *
@@ -338,13 +357,25 @@ public abstract class BookKeeperClusterTestCase {
      * @throws IOException
      */
     public void sleepBookie(BookieSocketAddress addr, final CountDownLatch l)
-            throws Exception {
+            throws InterruptedException, IOException {
+        final CountDownLatch suspendLatch = new CountDownLatch(1);
+        sleepBookie(addr, l, suspendLatch);
+        suspendLatch.await();
+    }
+
+    public void sleepBookie(BookieSocketAddress addr, final CountDownLatch l, final CountDownLatch suspendLatch)
+            throws InterruptedException, IOException {
         for (final BookieServer bookie : bs) {
             if (bookie.getLocalAddress().equals(addr)) {
-                bookie.suspendProcessing();
+                LOG.info("Sleep bookie {}.", addr);
                 Thread sleeper = new Thread() {
+                    @Override
                     public void run() {
                         try {
+                            bookie.suspendProcessing();
+                            if (null != suspendLatch) {
+                                suspendLatch.countDown();
+                            }
                             l.await();
                             bookie.resumeProcessing();
                         } catch (Exception e) {
diff --git a/bookkeeper-server/src/test/resources/log4j.properties b/bookkeeper-server/src/test/resources/log4j.properties
index 096c20d..25c5250 100644
--- a/bookkeeper-server/src/test/resources/log4j.properties
+++ b/bookkeeper-server/src/test/resources/log4j.properties
@@ -42,12 +42,17 @@ log4j.appender.CONSOLE.Threshold=INFO
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
 
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.apache.bookkeeper.bookie=INFO
+log4j.logger.org.apache.bookkeeper.meta=INFO
+
 #
 # Add ROLLINGFILE to rootLogger to get log file output
 #    Log DEBUG level and above messages to a log file
 log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.ROLLINGFILE.Threshold=DEBUG
-log4j.appender.ROLLINGFILE.File=bookkeeper-server.log
+log4j.appender.ROLLINGFILE.File=target/bookkeeper-server.log
 log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
 log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message