bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [bookkeeper] branch master updated: BOOKKEEPER-1102: Clarify BookieInfoReader and fix associated test flappers
Date Tue, 01 Aug 2017 06:34:19 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f8580  BOOKKEEPER-1102: Clarify BookieInfoReader and fix associated test flappers
a5f8580 is described below

commit a5f8580f53464065243a9af038935f5893434166
Author: Samuel Just <sjust@salesforce.com>
AuthorDate: Mon Jul 31 23:34:07 2017 -0700

    BOOKKEEPER-1102: Clarify BookieInfoReader and fix associated test flappers
    
    BookieInfoReader:
    
    The previous syncronization logic wasn't really correct, and the logic
    at the top of the method was far more complicated than it needed to be.
    Restrict bookies to be non-null.  Restructure the code to simply use
    the BookieInfoReader instance as a single lock.
    
    One significant behavioral change is that we scan every bookie not in
    the map, and we clear from the map bookies which returned an error.
    
    Also, explicitely cache the most recent bookie set reported by the
    BookieWatcher.  This eliminates the need to call into BookieWatcher
    from getReadWriteBookieInfo and the corresponding error path.  The
    periodic scan continues to explicitely check.
    
    Another departure is the addition of an explicit retry-on-error param to
    trigger retry if any of the requests failed
    (getBookieInfoRetryIntervalSeconds).  We'll only retry the ones that
    actually failed (along with any new additions since the last run).  This
    is useful because bookie startup triggers the addition of the bookie
    node to zk before the bookie actually becomes available for the bookie
    info request, so there can be rare races in the unit tests between
    BookieInfoReader requesting the info and the bookie actually being up.
    
    Also, add a method to allow tests to wait for updates to be reflected.
    
    PerChannelBookieClient: fix error handling for BookieInfo keys
    
    Passing a key corresponding to a GET_BOOKIE_INFO operation to
    errorOutReadKey results in a casting exception, clean up the invalid
    calls.
    
    BookKeeperClusterTestCase: add killBookieAndWaitForZK
    
    Should reduce the need for tests to wait for an arbitrary period to let
    the cluster "settle".
    
    BookKeeperDiskSpaceWeightedLedgerPlacementTest:
    
    This test was heavily time dependent, and the Thread.sleep values did
    not work universally.  Instead, eliminate the arbitrary Thread.sleep
    values and instead verify the free space changes on each change.
    
    Also, switch the delay on
    testDiskSpaceWeightedBookieSelectionWithPeriodicBookieInfoUpdate
    to simply use an atomic boolean to signal the value switch.
    
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Author: Samuel Just <sjust@salesforce.com>
    
    Reviewers: Enrico Olivelli <eolivelli@gmail.com>
    
    This patch had conflicts when merged, resolved by
    Committer: Sijie Guo <sijie@apache.org>
    
    This closes #275 from athanatos/forupstream/BOOKKEEPER-1102
---
 .../apache/bookkeeper/client/BookieInfoReader.java | 368 +++++++++++++++------
 .../bookkeeper/conf/ClientConfiguration.java       |  24 ++
 .../bookkeeper/proto/PerChannelBookieClient.java   |   9 +-
 ...KeeperDiskSpaceWeightedLedgerPlacementTest.java | 226 +++++++------
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  20 ++
 5 files changed, 440 insertions(+), 207 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
index 484e54b..c015efa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
@@ -17,15 +17,17 @@
  */
 package org.apache.bookkeeper.client;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
@@ -53,13 +55,6 @@ public class BookieInfoReader {
     private final ScheduledExecutorService scheduler;
     private final BookKeeper bk;
     private final ClientConfiguration conf;
-    private ConcurrentMap<BookieSocketAddress, BookieInfo> bookieInfoMap = new ConcurrentHashMap<BookieSocketAddress, BookieInfo>();
-    private Collection<BookieSocketAddress> bookies;
-    private final AtomicInteger totalSent = new AtomicInteger();
-    private final AtomicInteger completedCnt = new AtomicInteger();
-    private final AtomicBoolean instanceRunning = new AtomicBoolean();
-    private final AtomicBoolean isQueued = new AtomicBoolean();
-    private final AtomicBoolean refreshBookieList = new AtomicBoolean();
 
     /**
      * A class represents the information (e.g. disk usage, load) of a bookie.
@@ -91,121 +86,276 @@ public class BookieInfoReader {
         }
     }
 
+
+    /**
+     * Tracks the most recently reported set of bookies from BookieWatcher as well
+     * as current BookieInfo for bookies we've successfully queried.
+     */
+    private static class BookieInfoMap {
+        /**
+         * Contains the most recently obtained information on the contained bookies.
+         * When an error happens querying a bookie, the entry is removed.
+         */
+        private final Map<BookieSocketAddress, BookieInfo> infoMap = new HashMap<>();
+
+        /**
+         * Contains the most recently reported set of bookies from BookieWatcher
+         * A partial query consists of every member of mostRecentlyReportedBookies
+         * minus the entries in bookieInfoMap.
+         */
+        private Collection<BookieSocketAddress> mostRecentlyReportedBookies = new ArrayList<>();
+
+        public void updateBookies(Collection<BookieSocketAddress> updatedBookieSet) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "updateBookies: current: {}, new: {}",
+                        mostRecentlyReportedBookies, updatedBookieSet);
+            }
+            infoMap.keySet().retainAll(updatedBookieSet);
+            mostRecentlyReportedBookies = updatedBookieSet;
+        }
+
+        @SuppressWarnings("unchecked")
+        public Collection<BookieSocketAddress> getPartialScanTargets() {
+            return CollectionUtils.subtract(mostRecentlyReportedBookies, infoMap.keySet());
+        }
+
+        public Collection<BookieSocketAddress> getFullScanTargets() {
+            return mostRecentlyReportedBookies;
+        }
+
+        /**
+         * Returns info for bookie, null if not known
+         *
+         * @param bookie bookie for which to get info
+         * @return Info for bookie, null otherwise
+         */
+        public BookieInfo getInfo(BookieSocketAddress bookie) {
+            return infoMap.get(bookie);
+        }
+
+        /**
+         * Removes bookie from bookieInfoMap
+         *
+         * @param bookie bookie on which we observed an error
+         */
+        public void clearInfo(BookieSocketAddress bookie) {
+            infoMap.remove(bookie);
+        }
+
+        /**
+         * Report new info on bookie
+         *
+         * @param bookie bookie for which we obtained new info
+         * @param info the new info
+         */
+        public void gotInfo(BookieSocketAddress bookie, BookieInfo info) {
+            infoMap.put(bookie, info);
+        }
+
+        /**
+         * Get bookie info map
+         */
+        public Map<BookieSocketAddress, BookieInfo> getBookieMap() {
+            return infoMap;
+        }
+    }
+    private final BookieInfoMap bookieInfoMap = new BookieInfoMap();
+
+    /**
+     * Tracks whether there is an execution in progress as well as whether
+     * another is pending.
+     */
+    public enum State { UNQUEUED, PARTIAL, FULL }
+    private static class InstanceState {
+        private boolean running = false;
+        private State queuedType = State.UNQUEUED;
+
+        private boolean shouldStart() {
+            if (running) {
+                return false;
+            } else {
+                running = true;
+                return true;
+            }
+        }
+
+        /**
+         * Mark pending operation FULL and return true if there is no in-progress operation
+         *
+         * @return True if we should execute a scan, False if there is already one running
+         */
+        public boolean tryStartFull() {
+            queuedType = State.FULL;
+            return shouldStart();
+        }
+
+        /**
+         * Mark pending operation PARTIAL if not full and return true if there is no in-progress operation
+         *
+         * @return True if we should execute a scan, False if there is already one running
+         */
+        public boolean tryStartPartial() {
+            if (queuedType == State.UNQUEUED) {
+                queuedType = State.PARTIAL;
+            }
+            return shouldStart();
+        }
+
+        /**
+         * Gets and clears queuedType
+         */
+        public State getAndClearQueuedType() {
+            State ret = queuedType;
+            queuedType = State.UNQUEUED;
+            return ret;
+        }
+
+        /**
+         * If queuedType != UNQUEUED, returns true, leaves running equal to true
+         * Otherwise, returns false and sets running to false
+         */
+        public boolean completeUnlessQueued() {
+            if (queuedType == State.UNQUEUED) {
+                running = false;
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+    private final InstanceState instanceState = new InstanceState();
+
     BookieInfoReader(BookKeeper bk,
-                          ClientConfiguration conf,
-                          ScheduledExecutorService scheduler) {
+                     ClientConfiguration conf,
+                     ScheduledExecutorService scheduler) {
         this.bk = bk;
         this.conf = conf;
         this.scheduler = scheduler;
     }
-    void start() {
+
+    public void start() {
         scheduler.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Running periodic BookieInfo scan");
+                synchronized (BookieInfoReader.this) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Running periodic BookieInfo scan");
+                    }
+                    try {
+                        Collection<BookieSocketAddress> updatedBookies = bk.bookieWatcher.getBookies();
+                        bookieInfoMap.updateBookies(updatedBookies);
+                    } catch (BKException e) {
+                        LOG.info("Got exception while querying bookies from watcher, rerunning after {}s",
+                                 conf.getGetBookieInfoRetryIntervalSeconds(), e);
+                        scheduler.schedule(this, conf.getGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
+                        return;
+                    }
+                    if (instanceState.tryStartFull()) {
+                        getReadWriteBookieInfo();
+                    }
                 }
-                getReadWriteBookieInfo(null);
             }
         }, 0, conf.getGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
     }
-    void submitTask(final Collection<BookieSocketAddress> newBookies) {
-        scheduler.submit(new Runnable() {
-            @Override
-            public void run() {
-                getReadWriteBookieInfo(newBookies);
-            }
-        });
+
+    private void submitTask() {
+        scheduler.submit(() -> getReadWriteBookieInfo());
     }
-    void availableBookiesChanged(Set<BookieSocketAddress> newBookies) {
-        LOG.info("Scheduling bookie info read due to changes in available bookies.");
-        submitTask(newBookies);
+
+    private void submitTaskWithDelay(int delaySeconds) {
+        scheduler.schedule(() -> getReadWriteBookieInfo(), delaySeconds, TimeUnit.SECONDS);
     }
 
-    /*
-     * This routine is responsible for issuing bookieInfoGet messages to all the read write bookies.
-     * instanceRunning will be true until we have sent the bookieInfoGet requests to
-     * all the readwrite bookies and have processed all the callbacks. Only then is it reset to
-     * false. At that time, if any pending tasks are queued, they are scheduled by the
-     * last callback processing task. isQueued variable is used to indicate the pending
-     * tasks. refreshBookieList is used to indicate that we need to read we need to explicitly
-     * retireve the bookies list from zk because we don't remember the bookie list for
-     * queued ops.
+    synchronized void availableBookiesChanged(Set<BookieSocketAddress> updatedBookiesList) {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Scheduling bookie info read due to changes in available bookies.");
+        }
+        bookieInfoMap.updateBookies(updatedBookiesList);
+        if (instanceState.tryStartPartial()) {
+            submitTask();
+        }
+    }
+
+    /**
+     * Method to allow tests to block until bookie info is available
+     *
+     * @param bookie to lookup
+     * @return None if absent, free disk space if present
      */
-    @SuppressWarnings("unchecked")
-    void getReadWriteBookieInfo(Collection<BookieSocketAddress> newBookiesList) {
-        if (!instanceRunning.get()) {
-            instanceRunning.compareAndSet(false, true);
+    synchronized Optional<Long> getFreeDiskSpace(BookieSocketAddress bookie) {
+        BookieInfo bookieInfo = bookieInfoMap.getInfo(bookie);
+        if (bookieInfo != null) {
+            return Optional.of(bookieInfo.getFreeDiskSpace());
         } else {
-            isQueued.set(true);
-            if (newBookiesList != null) {
-                refreshBookieList.set(true);
-            }
+            return Optional.empty();
+        }
+    }
+
+    /* State to track scan execution progress as callbacks come in */
+    private int totalSent = 0;
+    private int completedCnt = 0;
+    private int errorCnt = 0;
+
+    /**
+     * Performs scan described by instanceState using the cached bookie information
+     * in bookieInfoMap.
+     */
+    synchronized void getReadWriteBookieInfo() {
+        State queuedType = instanceState.getAndClearQueuedType();
+        Collection<BookieSocketAddress> toScan;
+        if (queuedType == State.FULL) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Exiting due to running instance");
+                LOG.debug("Doing full scan");
             }
-            return;
-        }
-        Collection<BookieSocketAddress> deadBookies = null, joinedBookies=null;
-        if (newBookiesList == null) {
-            try {
-                if (this.bookies == null) {
-                    joinedBookies = this.bookies = bk.bookieWatcher.getBookies();
-                } else if (refreshBookieList.get()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Refreshing bookie list");
-                    }
-                    newBookiesList = bk.bookieWatcher.getBookies();
-                    refreshBookieList.set(false);
-                } else {
-                    // the bookie list is already up to date, just retrieve their info
-                    joinedBookies = this.bookies;
-                }
-            } catch (BKException e) {
-                LOG.error("Unable to get the available bookies ", e);
-                onExit();
-                return;
+            toScan = bookieInfoMap.getFullScanTargets();
+        } else if (queuedType == State.PARTIAL) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Doing partial scan");
             }
-        }
-        if (newBookiesList != null) {
-            if (this.bookies != null) {
-                joinedBookies = CollectionUtils.subtract(newBookiesList, this.bookies);
-                deadBookies = CollectionUtils.subtract(this.bookies, newBookiesList);
-                for (BookieSocketAddress b : deadBookies) {
-                    bookieInfoMap.remove(b);
-                    this.bookies.remove(b);
-                }
-                this.bookies.addAll(joinedBookies);
-            } else {
-                joinedBookies = this.bookies = newBookiesList;
+            toScan = bookieInfoMap.getPartialScanTargets();
+        } else {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Invalid state, queuedType cannot be UNQUEUED in getReadWriteBookieInfo");
             }
+            assert(queuedType != State.UNQUEUED);
+            return;
         }
 
         BookieClient bkc = bk.getBookieClient();
-        totalSent.set(0);
-        completedCnt.set(0);
+        final long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE |
+                               BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
+        totalSent = 0;
+        completedCnt = 0;
+        errorCnt = 0;
+
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Getting bookie info for: {}", joinedBookies);
+            LOG.debug("Getting bookie info for: {}", toScan);
         }
-        for (BookieSocketAddress b : joinedBookies) {
-            bkc.getBookieInfo(b, GET_BOOKIE_INFO_REQUEST_FLAGS,
+        for (BookieSocketAddress b : toScan) {
+            bkc.getBookieInfo(b, requested,
                     new GetBookieInfoCallback() {
                         void processReadInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
-                            BookieSocketAddress b = (BookieSocketAddress) ctx;
-                            if (rc != BKException.Code.OK) {
-                                LOG.error("Reading bookie info from bookie {} failed due to error: {}.", b, rc);
-                                // if there was data earlier, don't overwrite it
-                                // create a new one only if the key was missing
-                                bookieInfoMap.putIfAbsent(b, new BookieInfo());
-                            } else {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Bookie Info for bookie {} is {}", b, bInfo);
+                            synchronized (BookieInfoReader.this) {
+                                BookieSocketAddress b = (BookieSocketAddress) ctx;
+                                if (rc != BKException.Code.OK) {
+                                    if (LOG.isErrorEnabled()) {
+                                        LOG.error("Reading bookie info from bookie {} failed due to error: {}.", b, rc);
+                                    }
+                                    // We reread bookies missing from the map each time, so remove to ensure
+                                    // we get to it on the next scan
+                                    bookieInfoMap.clearInfo(b);
+                                    errorCnt++;
+                                } else {
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("Bookie Info for bookie {} is {}", b, bInfo);
+                                    }
+                                    bookieInfoMap.gotInfo(b, bInfo);
+                                }
+                                completedCnt++;
+                                if (totalSent == completedCnt) {
+                                    onExit();
                                 }
-                                bookieInfoMap.put(b, bInfo);
-                            }
-                            if (completedCnt.incrementAndGet() == totalSent.get()) {
-                                bk.placementPolicy.updateBookieInfo(bookieInfoMap);
-                                onExit();
                             }
                         }
                         @Override
@@ -219,27 +369,27 @@ public class BookieInfoReader {
                                 });
                         }
                     }, b);
-            totalSent.incrementAndGet();
+            totalSent++;
         }
-        if (totalSent.get() == 0) {
-            if (deadBookies != null) {
-                // if no new bookies joined but some existing bookies went away
-                // we need to inform the placementPloicy
-                bk.placementPolicy.updateBookieInfo(bookieInfoMap);
-            }
+        if (totalSent == 0) {
             onExit();
         }
     }
 
     void onExit() {
-        if (isQueued.get()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Scheduling a queued task");
+        bk.placementPolicy.updateBookieInfo(bookieInfoMap.getBookieMap());
+        if (errorCnt > 0) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Rescheduling in {}s due to errors", conf.getGetBookieInfoIntervalSeconds());
             }
-            submitTask(null);
+            instanceState.tryStartPartial();
+            submitTaskWithDelay(conf.getGetBookieInfoRetryIntervalSeconds());
+        } else if (instanceState.completeUnlessQueued()) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Rescheduling, another scan is pending");
+            }
+            submitTask();
         }
-        isQueued.set(false);
-        instanceRunning.set(false);
     }
 
     Map<BookieSocketAddress, BookieInfo> getBookieInfo() throws BKException, InterruptedException {
@@ -262,7 +412,9 @@ public class BookieInfoReader {
                         public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
                             BookieSocketAddress b = (BookieSocketAddress) ctx;
                             if (rc != BKException.Code.OK) {
-                                LOG.error("Reading bookie info from bookie {} failed due to error: {}.", b, rc);
+                                if (LOG.isErrorEnabled()) {
+                                    LOG.error("Reading bookie info from bookie {} failed due to error: {}.", b, rc);
+                                }
                             } else {
                                 if (LOG.isDebugEnabled()) {
                                     LOG.debug("Free disk space on bookie {} is {}.", b, bInfo.getFreeDiskSpace());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 6252786..8c2b260 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -100,6 +100,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     // Bookie info poll interval
     protected final static String DISK_WEIGHT_BASED_PLACEMENT_ENABLED = "diskWeightBasedPlacementEnabled";
     protected final static String GET_BOOKIE_INFO_INTERVAL_SECONDS = "getBookieInfoIntervalSeconds";
+    protected final static String GET_BOOKIE_INFO_RETRY_INTERVAL_SECONDS = "getBookieInfoRetryIntervalSeconds";
     protected final static String BOOKIE_MAX_MULTIPLE_FOR_WEIGHTED_PLACEMENT = "bookieMaxMultipleForWeightBasedPlacement";
     protected final static String GET_BOOKIE_INFO_TIMEOUT_SECS = "getBookieInfoTimeoutSecs";
 
@@ -1240,6 +1241,16 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get the time interval between retries on unsuccessful bookie info request.  Default is
+     * 60s.
+     *
+     * @return
+     */
+    public int getGetBookieInfoRetryIntervalSeconds() {
+        return getInt(GET_BOOKIE_INFO_RETRY_INTERVAL_SECONDS, 60);
+    }
+
+    /**
      * Return whether disk weight based placement policy is enabled
      * @return
      */
@@ -1287,6 +1298,19 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Set the time interval between retries on unsuccessful GetInfo requests
+     *
+     *
+     * @param interval
+     * @param unit
+     * @return client configuration
+     */
+    public ClientConfiguration setGetBookieInfoRetryIntervalSeconds(int interval, TimeUnit unit) {
+        setProperty(GET_BOOKIE_INFO_RETRY_INTERVAL_SECONDS, unit.toSeconds(interval));
+        return this;
+    }
+
+    /**
      * Set the max multiple to use for nodes with very high weight
      * @param multiple
      * @return client configuration
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index b3dc998..d2f4dc5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -869,7 +869,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
         final Channel c = channel;
         if (c == null) {
-            errorOutReadKey(completionKey);
+            errorOutGetBookieInfoKey(completionKey);
             return;
         }
 
@@ -888,13 +888,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                             LOG.warn("Writing GetBookieInfoRequest(flags={}) to channel {} failed : ",
                                     new Object[] { requested, c, future.cause() });
                         }
-                        errorOutReadKey(completionKey);
+                        errorOutGetBookieInfoKey(completionKey);
                     }
                 }
             });
         } catch(Throwable e) {
             LOG.warn("Get metadata operation {} failed", getBookieInfoRequest, e);
-            errorOutReadKey(completionKey);
+            errorOutGetBookieInfoKey(completionKey);
         }
     }
 
@@ -1136,6 +1136,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 case READ_ENTRY:
                     errorOutReadKey(key, rc);
                     break;
+                case GET_BOOKIE_INFO:
+                    errorOutGetBookieInfoKey(key, rc);
+                    break;
                 default:
                     break;
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
index 2f885c0..3bc612e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
@@ -20,9 +20,12 @@ package org.apache.bookkeeper.client;
 *
 */
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -32,6 +35,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,13 +47,37 @@ import static org.junit.Assert.*;
  */
 public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperClusterTestCase {
     private final static Logger LOG = LoggerFactory.getLogger(BookKeeperDiskSpaceWeightedLedgerPlacementTest.class);
+    private final static long MS_WEIGHT_UPDATE_TIMEOUT = 30000;
 
     public BookKeeperDiskSpaceWeightedLedgerPlacementTest() {
         super(10);
     }
-    
-    private BookieServer restartBookie(ServerConfiguration conf, final long initialFreeDiskSpace,
-            final long finallFreeDiskSpace, final int delaySecs) throws Exception {
+
+    class BookKeeperCheckInfoReader extends BookKeeper {
+        BookKeeperCheckInfoReader(ClientConfiguration conf) throws KeeperException, IOException, InterruptedException {
+            super(conf);
+        }
+
+        void blockUntilBookieWeightIs(BookieSocketAddress bookie, Optional<Long> target) throws InterruptedException {
+            long startMsecs = System.currentTimeMillis();
+            Optional<Long> freeDiskSpace = Optional.empty();
+            while (System.currentTimeMillis() < (startMsecs + MS_WEIGHT_UPDATE_TIMEOUT)) {
+                freeDiskSpace = bookieInfoReader.getFreeDiskSpace(bookie);
+                if (freeDiskSpace.equals(target)) {
+                    return;
+                }
+                Thread.sleep(1000);
+            }
+            fail(String.format(
+                    "Server %s still has weight %s rather than %s",
+                    bookie.toString(), freeDiskSpace.toString(), target.toString()));
+        }
+    }
+
+    private BookieServer restartBookie(
+            BookKeeperCheckInfoReader client, ServerConfiguration conf, final long initialFreeDiskSpace,
+            final long finalFreeDiskSpace, final AtomicBoolean useFinal) throws Exception {
+        final AtomicBoolean ready = useFinal == null ? new AtomicBoolean(false) : useFinal;
         Bookie bookieWithCustomFreeDiskSpace = new Bookie(conf) {
             long startTime = System.currentTimeMillis();
             @Override
@@ -57,44 +85,52 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
                 if (startTime == 0) {
                     startTime = System.currentTimeMillis();
                 }
-                if (delaySecs == 0 || ((System.currentTimeMillis()) - startTime < delaySecs*1000)) {
+                if (!ready.get()) {
                     return initialFreeDiskSpace;
                 } else {
-                    // after delaySecs, advertise finallFreeDiskSpace; before that advertise initialFreeDiskSpace
-                    return finallFreeDiskSpace;
+                    // after delaySecs, advertise finalFreeDiskSpace; before that advertise initialFreeDiskSpace
+                    return finalFreeDiskSpace;
                 }
             }
         };
         bsConfs.add(conf);
         BookieServer server = startBookie(conf, bookieWithCustomFreeDiskSpace);
         bs.add(server);
+        client.blockUntilBookieWeightIs(server.getLocalAddress(), Optional.of(initialFreeDiskSpace));
+        if (useFinal == null) {
+            ready.set(true);
+        }
         return server;
     }
 
-    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(int bookieIdx, final long freeDiskSpace)
+    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(
+            BookKeeperCheckInfoReader client,
+            int bookieIdx, final long freeDiskSpace)
             throws Exception {
-        LOG.info("Killing bookie " + bs.get(bookieIdx).getLocalAddress());
-        bs.get(bookieIdx).getLocalAddress();
-        ServerConfiguration conf = killBookie(bookieIdx);
-        return restartBookie(conf, freeDiskSpace, freeDiskSpace, 0);
+        return replaceBookieWithCustomFreeDiskSpaceBookie(client, bookieIdx, freeDiskSpace, freeDiskSpace, null);
     }
 
-    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(BookieServer bookie, final long freeDiskSpace)
+    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(
+            BookKeeperCheckInfoReader client,
+            BookieServer bookie, final long freeDiskSpace)
             throws Exception {
         for (int i=0; i < bs.size(); i++) {
             if (bs.get(i).getLocalAddress().equals(bookie.getLocalAddress())) {
-                return replaceBookieWithCustomFreeDiskSpaceBookie(i, freeDiskSpace);
+                return replaceBookieWithCustomFreeDiskSpaceBookie(client, i, freeDiskSpace);
             }
         }
         return null;
     }
 
-    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(int bookieIdx, long initialFreeDiskSpace,
-             long finalFreeDiskSpace, int delay) throws Exception {
-        LOG.info("Killing bookie " + bs.get(bookieIdx).getLocalAddress());
-        bs.get(bookieIdx).getLocalAddress();
-        ServerConfiguration conf = killBookie(bookieIdx);
-        return restartBookie(conf, initialFreeDiskSpace, finalFreeDiskSpace, delay);
+    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(
+            BookKeeperCheckInfoReader client,
+            int bookieIdx, long initialFreeDiskSpace,
+             long finalFreeDiskSpace, AtomicBoolean useFinal) throws Exception {
+        BookieSocketAddress addr = bs.get(bookieIdx).getLocalAddress();
+        LOG.info("Killing bookie {}", addr);
+        ServerConfiguration conf = killBookieAndWaitForZK(bookieIdx);
+        client.blockUntilBookieWeightIs(addr, Optional.empty());
+        return restartBookie(client, conf, initialFreeDiskSpace, finalFreeDiskSpace, useFinal);
     }
 
     /**
@@ -104,12 +140,20 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
     public void testDiskSpaceWeightedBookieSelection() throws Exception {
         long freeDiskSpace=1000000L;
         int multiple=3;
+
+        ClientConfiguration conf = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setDiskWeightBasedPlacementEnabled(true)
+                .setGetBookieInfoRetryIntervalSeconds(1, TimeUnit.SECONDS)
+                .setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
+        final BookKeeperCheckInfoReader client = new BookKeeperCheckInfoReader(conf);
+
         for (int i=0; i < numBookies; i++) {
             // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 3MB
             if (i < numBookies-2) {
-                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+                replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, freeDiskSpace);
             } else {
-                replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace);
+                replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, multiple*freeDiskSpace);
             }
         }
         Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
@@ -117,13 +161,6 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
             m.put(b.getLocalAddress(), 0);
         }
 
-        // wait a 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client
-        ClientConfiguration conf = new ClientConfiguration()
-            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
-            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
-        Thread.sleep(200);
-        final BookKeeper client = new BookKeeper(conf);
-        Thread.sleep(200);
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
@@ -149,12 +186,20 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
     public void testDiskSpaceWeightedBookieSelectionWithChangingWeights() throws Exception {
         long freeDiskSpace=1000000L;
         int multiple=3;
+
+        ClientConfiguration conf = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setDiskWeightBasedPlacementEnabled(true)
+                .setGetBookieInfoRetryIntervalSeconds(1, TimeUnit.SECONDS)
+                .setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
+        final BookKeeperCheckInfoReader client = new BookKeeperCheckInfoReader(conf);
+
         for (int i=0; i < numBookies; i++) {
             // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 3MB
             if (i < numBookies-2) {
-                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+                replaceBookieWithCustomFreeDiskSpaceBookie(client,0, freeDiskSpace);
             } else {
-                replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace);
+                replaceBookieWithCustomFreeDiskSpaceBookie(client,0, multiple*freeDiskSpace);
             }
         }
         Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
@@ -162,13 +207,6 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
             m.put(b.getLocalAddress(), 0);
         }
 
-        // wait a 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client
-        ClientConfiguration conf = new ClientConfiguration()
-            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
-            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
-        Thread.sleep(100);
-        final BookKeeper client = new BookKeeper(conf);
-        Thread.sleep(100);
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
@@ -192,12 +230,11 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         BookieServer server3 = bs.get(numBookies-2);
         BookieServer server4 = bs.get(numBookies-1);
 
-        server1 = replaceBookieWithCustomFreeDiskSpaceBookie(server1, multiple*freeDiskSpace);
-        server2 = replaceBookieWithCustomFreeDiskSpaceBookie(server2, multiple*freeDiskSpace);
-        server3 = replaceBookieWithCustomFreeDiskSpaceBookie(server3, freeDiskSpace);
-        server4 = replaceBookieWithCustomFreeDiskSpaceBookie(server4, freeDiskSpace);
+        server1 = replaceBookieWithCustomFreeDiskSpaceBookie(client, server1, multiple*freeDiskSpace);
+        server2 = replaceBookieWithCustomFreeDiskSpaceBookie(client, server2, multiple*freeDiskSpace);
+        server3 = replaceBookieWithCustomFreeDiskSpaceBookie(client, server3, freeDiskSpace);
+        server4 = replaceBookieWithCustomFreeDiskSpaceBookie(client, server4, freeDiskSpace);
 
-        Thread.sleep(100);
         for (BookieServer b : bs) {
             m.put(b.getLocalAddress(), 0);
         }
@@ -231,12 +268,20 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
     public void testDiskSpaceWeightedBookieSelectionWithBookiesDying() throws Exception {
         long freeDiskSpace=1000000L;
         int multiple=3;
+
+        ClientConfiguration conf = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setDiskWeightBasedPlacementEnabled(true)
+                .setGetBookieInfoRetryIntervalSeconds(1, TimeUnit.SECONDS)
+                .setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
+        final BookKeeperCheckInfoReader client = new BookKeeperCheckInfoReader(conf);
+
         for (int i=0; i < numBookies; i++) {
             // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 1GB
             if (i < numBookies-2) {
-                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+                replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, freeDiskSpace);
             } else {
-                replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace);
+                replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, multiple*freeDiskSpace);
             }
         }
         Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
@@ -244,13 +289,6 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
             m.put(b.getLocalAddress(), 0);
         }
 
-        // wait a couple of 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client
-        ClientConfiguration conf = new ClientConfiguration()
-            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
-            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
-        Thread.sleep(100);
-        final BookKeeper client = new BookKeeper(conf);
-        Thread.sleep(100);
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
@@ -272,11 +310,9 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         }
         BookieServer server1 = bs.get(numBookies-2);
         BookieServer server2 = bs.get(numBookies-1);
-        killBookie(numBookies-1);
-        killBookie(numBookies-2);
+        killBookieAndWaitForZK(numBookies-1);
+        killBookieAndWaitForZK(numBookies-2);
 
-        // give some time for the cluster to become stable
-        Thread.sleep(100);
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
@@ -307,25 +343,26 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
     public void testDiskSpaceWeightedBookieSelectionWithBookiesBeingAdded() throws Exception {
         long freeDiskSpace=1000000L;
         int multiple=3;
+
+        ClientConfiguration conf = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setDiskWeightBasedPlacementEnabled(true)
+                .setGetBookieInfoRetryIntervalSeconds(1, TimeUnit.SECONDS)
+                .setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
+        final BookKeeperCheckInfoReader client = new BookKeeperCheckInfoReader(conf);
+
         for (int i=0; i < numBookies; i++) {
             // all the bookies have freeDiskSpace of 1MB
-            replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+            replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, freeDiskSpace);
         }
         // let the last two bookies be down initially
-        ServerConfiguration conf1 = killBookie(numBookies-1);
-        ServerConfiguration conf2 = killBookie(numBookies-2);
+        ServerConfiguration conf1 = killBookieAndWaitForZK(numBookies-1);
+        ServerConfiguration conf2 = killBookieAndWaitForZK(numBookies-2);
         Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
         for (BookieServer b : bs) {
             m.put(b.getLocalAddress(), 0);
         }
 
-        // wait a bit for the bookies to come up and the bookieInfo to be retrieved by the client
-        ClientConfiguration conf = new ClientConfiguration()
-            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
-            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
-        Thread.sleep(100);
-        final BookKeeper client = new BookKeeper(conf);
-        Thread.sleep(100);
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
@@ -342,11 +379,9 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         }
 
         // bring up the two dead bookies; they'll also have 3X more free space than the rest of the bookies
-        restartBookie(conf1, multiple*freeDiskSpace, multiple*freeDiskSpace, 0);
-        restartBookie(conf2, multiple*freeDiskSpace, multiple*freeDiskSpace, 0);
+        restartBookie(client, conf1, multiple*freeDiskSpace, multiple*freeDiskSpace, null);
+        restartBookie(client, conf2, multiple*freeDiskSpace, multiple*freeDiskSpace, null);
 
-        // give some time for the cluster to become stable
-        Thread.sleep(100);
         for (BookieServer b : bs) {
             m.put(b.getLocalAddress(), 0);
         }
@@ -376,13 +411,25 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
     public void testDiskSpaceWeightedBookieSelectionWithPeriodicBookieInfoUpdate() throws Exception {
         long freeDiskSpace=1000000L;
         int multiple=3;
+
+        int updateIntervalSecs = 6;
+         ClientConfiguration conf = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setDiskWeightBasedPlacementEnabled(true)
+                .setGetBookieInfoRetryIntervalSeconds(1, TimeUnit.SECONDS)
+                .setBookieMaxWeightMultipleForWeightBasedPlacement(multiple)
+                .setGetBookieInfoIntervalSeconds(updateIntervalSecs, TimeUnit.SECONDS);
+        final BookKeeperCheckInfoReader client = new BookKeeperCheckInfoReader(conf);
+
+        AtomicBoolean useHigherValue = new AtomicBoolean(false);
         for (int i=0; i < numBookies; i++) {
             // the first 8 bookies have freeDiskSpace of 1MB; the remaining 2 will advertise 1MB for
-            // the first 3 seconds but then they'll advertise 3MB after the first 3 seconds
+            // the start of the test, and 3MB once useHigherValue is set
             if (i < numBookies-2) {
-                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+                replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, freeDiskSpace);
             } else {
-                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace, multiple*freeDiskSpace, 2);
+                replaceBookieWithCustomFreeDiskSpaceBookie(
+                        client, 0, freeDiskSpace, multiple*freeDiskSpace, useHigherValue);
             }
         }
         Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
@@ -390,42 +437,29 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
             m.put(b.getLocalAddress(), 0);
         }
 
-        // the periodic bookieInfo is read once every 7 seconds
-        int updateIntervalSecs = 6;
-        ClientConfiguration conf = new ClientConfiguration()
-            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
-            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple).
-            setGetBookieInfoIntervalSeconds(updateIntervalSecs, TimeUnit.SECONDS);
-        // wait a bit for the bookies to come up and the bookieInfo to be retrieved by the client
-        Thread.sleep(100);
-        final BookKeeper client = new BookKeeper(conf);
-        Thread.sleep(100);
-        long startMsecs = MathUtils.now();
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
                 m.put(b, m.get(b)+1);
             }
         }
-        long elapsedMsecs = MathUtils.now() - startMsecs;
 
-        // make sure that all the bookies are chosen pretty much uniformly
-        int bookiesToCheck = numBookies-1;
-        if (elapsedMsecs > updateIntervalSecs*1000) {
-            // if this task longer than updateIntervalSecs, the weight for the last 2 bookies will be
-            // higher, so skip checking them
-            bookiesToCheck = numBookies-3;
-        }
-        for (int i=0; i < bookiesToCheck; i++) {
+        for (int i=0; i < numBookies - 1; i++) {
             double delta = Math.abs((double)m.get(bs.get(i).getLocalAddress())-(double)m.get(bs.get(i+1).getLocalAddress()));
             delta = (delta*100)/(double)m.get(bs.get(i+1).getLocalAddress());
             assertTrue("Weigheted placement is not honored: " + delta, delta <= 30); // the deviation should be <30%
         }
 
-        if (elapsedMsecs < updateIntervalSecs*1000) {
-            // sleep until periodic bookie info retrieval kicks in and it gets the updated
-            // freeDiskSpace for the last 2 bookies
-            Thread.sleep(updateIntervalSecs*1000 - elapsedMsecs);
+
+        // Sleep for double the time required to update the bookie infos, and then check each one
+        useHigherValue.set(true);
+        Thread.sleep(updateIntervalSecs * 1000);
+        for (int i=0; i < numBookies; i++) {
+            if (i < numBookies-2) {
+                client.blockUntilBookieWeightIs(bs.get(i).getLocalAddress(), Optional.of(freeDiskSpace));
+            } else {
+                client.blockUntilBookieWeightIs(bs.get(i).getLocalAddress(), Optional.of(freeDiskSpace * multiple));
+            }
         }
 
         for (BookieServer b : bs) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index a2f66bb..3b8d2a4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -322,6 +322,26 @@ public abstract class BookKeeperClusterTestCase {
     }
 
     /**
+     * Kill bookie by index and verify that it's stopped
+     *
+     * @param index index of bookie to kill
+     *
+     * @return configuration of killed bookie
+     */
+    public ServerConfiguration killBookieAndWaitForZK(int index) throws Exception {
+        if (index >= bs.size()) {
+            throw new IOException("Bookie does not exist");
+        }
+        BookieServer server = bs.get(index);
+        ServerConfiguration ret = killBookie(index);
+        while (zkc.exists(baseConf.getZkAvailableBookiesPath() + "/"
+                + server.getLocalAddress().toString(), false) != null) {
+            Thread.sleep(500);
+        }
+        return ret;
+    }
+
+    /**
      * Sleep a bookie
      *
      * @param addr

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message