bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-895: GC ledgers that are no longer a part of the ensemble
Date Wed, 27 Apr 2016 07:55:27 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 40af841a3 -> dd575a16d


BOOKKEEPER-895: GC ledgers that are no longer a part of the ensemble

Author: Siddharth Boobna <sboobna@yahoo-inc.com>

Reviewers: Matteo Merli <mmerli@apache.org>, Guo Sijie <sijie@apache.org>

Closes #25 from sboobna/BOOKKEEPER-895


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/dd575a16
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/dd575a16
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/dd575a16

Branch: refs/heads/master
Commit: dd575a16decbeaab1f230db271bc400d7e434216
Parents: 40af841
Author: Siddharth Boobna <sboobna@yahoo-inc.com>
Authored: Wed Apr 27 00:55:19 2016 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Wed Apr 27 00:55:19 2016 -0700

----------------------------------------------------------------------
 bookkeeper-server/conf/bk_server.conf           |   5 +
 .../bookie/GarbageCollectorThread.java          |   2 +-
 .../bookie/ScanAndCompareGarbageCollector.java  | 128 +++++++++-
 .../bookkeeper/conf/ServerConfiguration.java    |  32 ++-
 .../meta/ZkLedgerUnderreplicationManager.java   | 119 +++++----
 .../bookkeeper/util/BookKeeperConstants.java    |   1 +
 .../bookie/TestGcOverreplicatedLedger.java      | 240 +++++++++++++++++++
 .../apache/bookkeeper/meta/GcLedgersTest.java   | 119 +--------
 .../bookkeeper/meta/LedgerManagerTestCase.java  | 118 ++++++++-
 .../test/BookKeeperClusterTestCase.java         |  17 ++
 10 files changed, 619 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index f73e633..b3f1637 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -162,6 +162,11 @@ ledgerDirectories=/tmp/bk-data
 # interval if there is enough disk capacity.
 # gcWaitTime=1000
 
+# How long the interval to trigger next garbage collection of overreplicated
+# ledgers, in milliseconds [Default: 1 day]. This should not be run very frequently since we read
+# the metadata for all the ledgers on the bookie from zk
+# gcOverreplicatedLedgerWaitTime=86400000
+
 # How long the interval to flush ledger index pages to disk, in milliseconds
 # Flushing index files will introduce much random disk I/O.
 # If separating journal dir and ledger dirs each on different devices,

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 4d7da25..2821ec8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -244,7 +244,7 @@ public class GarbageCollectorThread extends BookieThread {
             }
         };
 
-        this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage);
+        this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf);
 
         // compaction parameters
         minorCompactionThreshold = conf.getMinorCompactionThreshold();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
index 2b4e3c0..05cd958 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
@@ -21,12 +21,27 @@
 
 package org.apache.bookkeeper.bookie;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,12 +64,32 @@ import com.google.common.collect.Sets;
 public class ScanAndCompareGarbageCollector implements GarbageCollector{
 
     static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class);
+    static final int MAX_CONCURRENT_ZK_REQUESTS = 1000;
+
     private final LedgerManager ledgerManager;
     private final CompactableLedgerStorage ledgerStorage;
-
-    public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage) {
+    private final ServerConfiguration conf;
+    private final BookieSocketAddress selfBookieAddress;
+    private ZooKeeper zk = null;
+    private boolean enableGcOverReplicatedLedger;
+    private final long gcOverReplicatedLedgerIntervalMillis;
+    private long lastOverReplicatedLedgerGcTimeMillis;
+    private final String zkLedgersRootPath;
+
+    public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage,
+            ServerConfiguration conf) throws IOException {
         this.ledgerManager = ledgerManager;
         this.ledgerStorage = ledgerStorage;
+        this.conf = conf;
+        this.selfBookieAddress = Bookie.getBookieAddress(conf);
+        this.gcOverReplicatedLedgerIntervalMillis = conf.getGcOverreplicatedLedgerWaitTimeMillis();
+        this.lastOverReplicatedLedgerGcTimeMillis = MathUtils.now();
+        if (gcOverReplicatedLedgerIntervalMillis > 0) {
+            this.enableGcOverReplicatedLedger = true;
+        }
+        this.zkLedgersRootPath = conf.getZkLedgersRootPath();
+        LOG.info("Over Replicated Ledger Deletion : enabled=" + enableGcOverReplicatedLedger + ", interval="
+                + gcOverReplicatedLedgerIntervalMillis);
     }
 
     @Override
@@ -75,6 +110,22 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector{
 
             long lastEnd = -1;
 
+            long curTime = MathUtils.now();
+            boolean checkOverreplicatedLedgers = (enableGcOverReplicatedLedger && curTime
+                    - lastOverReplicatedLedgerGcTimeMillis > gcOverReplicatedLedgerIntervalMillis);
+            if (checkOverreplicatedLedgers) {
+                zk = ZooKeeperClient.newBuilder().connectString(conf.getZkServers())
+                        .sessionTimeoutMs(conf.getZkTimeout()).build();
+                // remove all the overreplicated ledgers from the local bookie
+                Set<Long> overReplicatedLedgers = removeOverReplicatedledgers(bkActiveLedgers, garbageCleaner);
+                if (overReplicatedLedgers.isEmpty()) {
+                    LOG.info("No over-replicated ledgers found.");
+                } else {
+                    LOG.info("Removed over-replicated ledgers: {}", overReplicatedLedgers);
+                }
+                lastOverReplicatedLedgerGcTimeMillis = MathUtils.now();
+            }
+
             while(ledgerRangeIterator.hasNext()) {
                 LedgerRange lRange = ledgerRangeIterator.next();
 
@@ -100,8 +151,79 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector{
         } catch (Exception e) {
             // ignore exception, collecting garbage next time
             LOG.warn("Exception when iterating over the metadata {}", e);
+        } finally {
+            if (zk != null) {
+                try {
+                    zk.close();
+                } catch (InterruptedException e) {
+                    LOG.error("Error closing zk session", e);
+                }
+                zk = null;
+            }
         }
     }
-}
 
+    private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final GarbageCleaner garbageCleaner)
+            throws InterruptedException, KeeperException {
+        final Set<Long> overReplicatedLedgers = Sets.newHashSet();
+        final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ZK_REQUESTS);
+        final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size());
+        for (final Long ledgerId : bkActiveledgers) {
+            try {
+                // check if the ledger is being replicated already by the replication worker
+                if (ZkLedgerUnderreplicationManager.isLedgerBeingReplicated(zk, zkLedgersRootPath, ledgerId)) {
+                    latch.countDown();
+                    continue;
+                }
+                // we try to acquire the underreplicated ledger lock to not let the bookie replicate the ledger that is
+                // already being checked for deletion, since that might change the ledger ensemble to include the
+                // current bookie again and, in that case, we cannot remove the ledger from local storage
+                ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zk, zkLedgersRootPath, ledgerId);
+                semaphore.acquire();
+                ledgerManager.readLedgerMetadata(ledgerId, new GenericCallback<LedgerMetadata>() {
+
+                    @Override
+                    public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
+                        if (rc == BKException.Code.OK) {
+                            // do not delete a ledger that is not closed, since the ensemble might change again and
+                            // include the current bookie while we are deleting it
+                            if (!ledgerMetadata.isClosed()) {
+                                release();
+                                return;
+                            }
+                            SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles();
+                            for (ArrayList<BookieSocketAddress> ensemble : ensembles.values()) {
+                                // check if this bookie is supposed to have this ledger
+                                if (ensemble.contains(selfBookieAddress)) {
+                                    release();
+                                    return;
+                                }
+                            }
+                            // this bookie is not supposed to have this ledger, thus we can delete this ledger now
+                            overReplicatedLedgers.add(ledgerId);
+                            garbageCleaner.clean(ledgerId);
+                        }
+                        release();
+                    }
 
+                    private void release() {
+                        semaphore.release();
+                        latch.countDown();
+                        try {
+                            ZkLedgerUnderreplicationManager.releaseUnderreplicatedLedgerLock(zk, zkLedgersRootPath,
+                                    ledgerId);
+                        } catch (Exception e) {
+                            LOG.error("Error removing underreplicated lock for ledger {}", ledgerId, e);
+                        }
+                    }
+                });
+            } catch (Exception e) {
+                LOG.error("Exception when iterating through the ledgers to check for over-replication", e);
+                latch.countDown();
+            }
+        }
+        latch.await();
+        bkActiveledgers.removeAll(overReplicatedLedgers);
+        return overReplicatedLedgers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index c717cd8..7d9b697 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -19,8 +19,10 @@ package org.apache.bookkeeper.conf;
 
 import java.io.File;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.Beta;
+
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -51,6 +53,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     // Gc Parameters
     protected final static String GC_WAIT_TIME = "gcWaitTime";
     protected final static String IS_FORCE_GC_ALLOW_WHEN_NO_SPACE = "isForceGCAllowWhenNoSpace";
+    protected final static String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime";
     // Sync Parameters
     protected final static String FLUSH_INTERVAL = "flushInterval";
     // Bookie death watch interval
@@ -210,10 +213,37 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get wait time in millis for garbage collection of overreplicated ledgers
+     * 
+     * @return gc wait time
+     */
+    public long getGcOverreplicatedLedgerWaitTimeMillis() {
+        return this.getLong(GC_OVERREPLICATED_LEDGER_WAIT_TIME, TimeUnit.DAYS.toMillis(1));
+    }
+
+    /**
+     * Set wait time for garbage collection of overreplicated ledgers. Default: 1 day
+     * 
+     * A ledger can be overreplicated under the following circumstances:
+     * 1. The ledger with few entries has bk1 and bk2 as its ensemble.
+     * 2. bk1 crashes.
+     * 3. bk3 replicates the ledger from bk2 and updates the ensemble to bk2 and bk3.
+     * 4. bk1 comes back up.
+     * 5. Now there are 3 copies of the ledger.
+     *  
+     * @param gcWaitTime
+     * @return server configuration
+     */
+    public ServerConfiguration setGcOverreplicatedLedgerWaitTime(long gcWaitTime, TimeUnit unit) {
+        this.setProperty(GC_OVERREPLICATED_LEDGER_WAIT_TIME, Long.toString(unit.toMillis(gcWaitTime)));
+        return this;
+    }
+
+    /**
      * Get flush interval. Default value is 10 second. It isn't useful to decrease
      * this value, since ledger storage only checkpoints when an entry logger file
      * is rolled.
-     *
+     * 
      * @return flush interval
      */
     public int getFlushInterval() {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index a4600bc..c49c5a2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -18,51 +18,45 @@
 
 package org.apache.bookkeeper.meta;
 
+import static com.google.common.base.Charsets.UTF_8;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.net.DNS;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
+import org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
+import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
 import org.apache.bookkeeper.replication.ReplicationEnableCb;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.ZkUtils;
-
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
-import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
-import org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.TextFormat;
 import com.google.common.base.Joiner;
-import static com.google.common.base.Charsets.UTF_8;
-
-import java.net.UnknownHostException;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.Collections;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.ArrayList;
-
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.protobuf.TextFormat;
 
 /**
  * ZooKeeper implementation of underreplication manager.
@@ -84,6 +78,8 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     static final String LAYOUT="BASIC";
     static final int LAYOUT_VERSION=1;
 
+    public static final byte[] LOCK_DATA = getLockData();
+
     private static class Lock {
         private final String lockZNode;
         private final int ledgerZNodeVersion;
@@ -103,22 +99,32 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     private final String urLedgerPath;
     private final String urLockPath;
     private final String layoutZNode;
-    private final LockDataFormat lockData;
 
     private final ZooKeeper zkc;
 
     public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
-        basePath = conf.getZkLedgersRootPath() + '/'
-                + BookKeeperConstants.UNDER_REPLICATION_NODE;
+        basePath = getBasePath(conf.getZkLedgersRootPath());
         layoutZNode = basePath + '/' + BookKeeperConstants.LAYOUT_ZNODE;
         urLedgerPath = basePath
                 + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
-        urLockPath = basePath + "/locks";
+        urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
 
         idExtractionPattern = Pattern.compile("urL(\\d+)$");
         this.zkc = zkc;
 
+        checkLayout();
+    }
+
+    public static String getBasePath(String rootPath) {
+        return String.format("%s/%s", rootPath, BookKeeperConstants.UNDER_REPLICATION_NODE);
+    }
+
+    public static String getUrLockPath(String rootPath) {
+        return String.format("%s/%s", getBasePath(rootPath), BookKeeperConstants.UNDER_REPLICATION_LOCK);
+    }
+
+    public static byte[] getLockData() {
         LockDataFormat.Builder lockDataBuilder = LockDataFormat.newBuilder();
         try {
             lockDataBuilder.setBookieId(DNS.getDefaultHost("default"));
@@ -126,9 +132,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             // if we cant get the address, ignore. it's optional
             // in the data structure in any case
         }
-        lockData = lockDataBuilder.build();
-
-        checkLayout();
+        return TextFormat.printToString(lockDataBuilder.build()).getBytes(UTF_8);
     }
 
     private void checkLayout()
@@ -212,6 +216,10 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         return String.format("%s/urL%010d", getParentZnodePath(base, ledgerId), ledgerId);
     }
 
+    public static String getUrLedgerLockZnode(String base, long ledgerId) {
+        return String.format("%s/urL%010d", base, ledgerId);
+    }
+
     private String getUrLedgerZnode(long ledgerId) {
         return getUrLedgerZnode(urLedgerPath, ledgerId);
     }
@@ -399,8 +407,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                     }
 
                     long ledgerId = getLedgerId(tryChild);
-                    zkc.create(lockPath, TextFormat.printToString(lockData).getBytes(UTF_8),
-                               Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+                    zkc.create(lockPath, LOCK_DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                     heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
                     return ledgerId;
                 } catch (KeeperException.NodeExistsException nee) {
@@ -627,4 +634,32 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                     "Interrupted while contacting zookeeper", ie);
         }
     }
+
+    /**
+     * Check whether the ledger is being replicated by any bookie
+     */
+    public static boolean isLedgerBeingReplicated(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
+            throws KeeperException,
+            InterruptedException {
+        return zkc.exists(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), false) != null;
+    }
+
+    /**
+     * Acquire the underreplicated ledger lock
+     */
+    public static void acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
+            throws KeeperException, InterruptedException {
+        ZkUtils.createFullPathOptimistic(zkc, getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId),
+                LOCK_DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    }
+
+    /**
+     * Release the underreplicated ledger lock if it exists
+     */
+    public static void releaseUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
+            throws InterruptedException, KeeperException {
+        if (isLedgerBeingReplicated(zkc, zkLedgersRootPath, ledgerId)) {
+            zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index 2c2ba38..987de7a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -38,6 +38,7 @@ public class BookKeeperConstants {
     public static final String AVAILABLE_NODE = "available";
     public static final String COOKIE_NODE = "cookies";
     public static final String UNDER_REPLICATION_NODE = "underreplication";
+    public static final String UNDER_REPLICATION_LOCK = "locks";
     public static final String DISABLE_NODE = "disable";
     public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers";
     public static final String LAYOUT_ZNODE = "LAYOUT";

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
new file mode 100644
index 0000000..5004817
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
@@ -0,0 +1,240 @@
+/**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
+  */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.FlatLedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerManagerTestCase;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class TestGcOverreplicatedLedger extends LedgerManagerTestCase {
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        ledgerManager = ledgerManagerFactory.newLedgerManager();
+        activeLedgers = new SnapshotMap<Long, Boolean>();
+    }
+
+    public TestGcOverreplicatedLedger(Class<? extends LedgerManagerFactory> lmFactoryCls) {
+        super(lmFactoryCls, 3);
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { FlatLedgerManagerFactory.class } });
+    }
+
+    @Test(timeout = 60000)
+    public void testGcOverreplicatedLedger() throws Exception {
+        LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+        activeLedgers.put(lh.getId(), true);
+
+        final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null);
+        final CountDownLatch latch = new CountDownLatch(1);
+        ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() {
+
+            @Override
+            public void operationComplete(int rc, LedgerMetadata result) {
+                if (rc == BKException.Code.OK) {
+                    newLedgerMetadata.set(result);
+                }
+                latch.countDown();
+            }
+        });
+        latch.await();
+        if (newLedgerMetadata.get() == null) {
+            Assert.fail("No ledger metadata found");
+        }
+        BookieSocketAddress bookieNotInEnsemble = getBookieNotInEnsemble(newLedgerMetadata.get());
+        ServerConfiguration bkConf = getBkConf(bookieNotInEnsemble);
+        bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS);
+
+        lh.close();
+
+        final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
+        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage,
+                bkConf);
+        Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1);
+        garbageCollector.gc(new GarbageCleaner() {
+
+            @Override
+            public void clean(long ledgerId) {
+                try {
+                    mockLedgerStorage.deleteLedger(ledgerId);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    return;
+                }
+            }
+        });
+
+        Assert.assertFalse(activeLedgers.containsKey(lh.getId()));
+    }
+
+    @Test(timeout = 60000)
+    public void testNoGcOfLedger() throws Exception {
+        LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+        activeLedgers.put(lh.getId(), true);
+
+        final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null);
+        final CountDownLatch latch = new CountDownLatch(1);
+        ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() {
+
+            @Override
+            public void operationComplete(int rc, LedgerMetadata result) {
+                if (rc == BKException.Code.OK) {
+                    newLedgerMetadata.set(result);
+                }
+                latch.countDown();
+            }
+        });
+        latch.await();
+        if (newLedgerMetadata.get() == null) {
+            Assert.fail("No ledger metadata found");
+        }
+        BookieSocketAddress address = null;
+        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembleMap = newLedgerMetadata.get().getEnsembles();
+        for (ArrayList<BookieSocketAddress> ensemble : ensembleMap.values()) {
+            address = ensemble.get(0);
+        }
+        ServerConfiguration bkConf = getBkConf(address);
+        bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS);
+
+        lh.close();
+
+        final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
+        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage,
+                bkConf);
+        Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1);
+        garbageCollector.gc(new GarbageCleaner() {
+
+            @Override
+            public void clean(long ledgerId) {
+                try {
+                    mockLedgerStorage.deleteLedger(ledgerId);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    return;
+                }
+            }
+        });
+
+        Assert.assertTrue(activeLedgers.containsKey(lh.getId()));
+    }
+
+    @Test(timeout = 60000)
+    public void testNoGcIfLedgerBeingReplicated() throws Exception {
+        LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+        activeLedgers.put(lh.getId(), true);
+
+        final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null);
+        final CountDownLatch latch = new CountDownLatch(1);
+        ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() {
+
+            @Override
+            public void operationComplete(int rc, LedgerMetadata result) {
+                if (rc == BKException.Code.OK) {
+                    newLedgerMetadata.set(result);
+                }
+                latch.countDown();
+            }
+        });
+        latch.await();
+        if (newLedgerMetadata.get() == null) {
+            Assert.fail("No ledger metadata found");
+        }
+        BookieSocketAddress bookieNotInEnsemble = getBookieNotInEnsemble(newLedgerMetadata.get());
+        ServerConfiguration bkConf = getBkConf(bookieNotInEnsemble);
+        bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS);
+
+        lh.close();
+
+        ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zkc, baseConf.getZkLedgersRootPath(),
+                lh.getId());
+
+        final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
+        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage,
+                bkConf);
+        Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1);
+        garbageCollector.gc(new GarbageCleaner() {
+
+            @Override
+            public void clean(long ledgerId) {
+                try {
+                    mockLedgerStorage.deleteLedger(ledgerId);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    return;
+                }
+            }
+        });
+
+        Assert.assertTrue(activeLedgers.containsKey(lh.getId()));
+    }
+
+    private BookieSocketAddress getBookieNotInEnsemble(LedgerMetadata ledgerMetadata) throws UnknownHostException {
+        List<BookieSocketAddress> allAddresses = Lists.newArrayList();
+        for (BookieServer bk : bs) {
+            allAddresses.add(bk.getLocalAddress());
+        }
+        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles();
+        for (ArrayList<BookieSocketAddress> fragmentEnsembles : ensembles.values()) {
+            for (BookieSocketAddress ensemble : fragmentEnsembles) {
+                allAddresses.remove(ensemble);
+            }
+        }
+        Assert.assertEquals(allAddresses.size(), 1);
+        return allAddresses.get(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 3875ec4..d5866a5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,15 +21,18 @@
 
 package org.apache.bookkeeper.meta;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
@@ -39,21 +42,12 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
-import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.CheckpointSource;
-import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollector;
-import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -62,8 +56,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
 /**
  * Test garbage collection ledgers in ledger manager
  */
@@ -175,7 +167,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         final CountDownLatch endLatch = new CountDownLatch(2);
         final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
         final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
-                mockLedgerStorage);
+                mockLedgerStorage, baseConf);
         Thread gcThread = new Thread() {
             @Override
             public void run() {
@@ -246,7 +238,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         createLedgers(numLedgers, createdLedgers);
 
         final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
-                new MockLedgerStorage());
+                new MockLedgerStorage(), baseConf);
         GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
                 @Override
                 public void clean(long ledgerId) {
@@ -282,7 +274,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         createLedgers(numLedgers, createdLedgers);
 
         final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
-                new MockLedgerStorage());
+                new MockLedgerStorage(), baseConf);
         GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
                 @Override
                 public void clean(long ledgerId) {
@@ -309,97 +301,4 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         assertEquals("Should have cleaned something", 1, cleaned.size());
         assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.get(0));
     }
-
-    class MockLedgerStorage implements CompactableLedgerStorage {
-
-        @Override
-        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
-                               LedgerDirsManager ledgerDirsManager,
-                               LedgerDirsManager indexDirsManager,
-                               CheckpointSource checkpointSource, StatsLogger statsLogger)
-                throws IOException {}
-
-        @Override
-        public void start() {
-        }
-
-        @Override
-        public void shutdown() throws InterruptedException {
-        }
-
-        @Override
-        public boolean ledgerExists(long ledgerId) throws IOException {
-            return false;
-        }
-
-        @Override
-        public boolean setFenced(long ledgerId) throws IOException {
-            return false;
-        }
-
-        @Override
-        public boolean isFenced(long ledgerId) throws IOException {
-            return false;
-        }
-
-        @Override
-        public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
-        }
-
-        @Override
-        public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
-            return null;
-        }
-
-        @Override
-        public long addEntry(ByteBuffer entry) throws IOException {
-            return 0;
-        }
-
-        @Override
-        public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
-            return null;
-        }
-
-        @Override
-        public void flush() throws IOException {
-        }
-
-        @Override
-        public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
-            return null;
-        }
-
-        @Override
-        public void deleteLedger(long ledgerId) throws IOException {
-            activeLedgers.remove(ledgerId);
-        }
-
-        @Override
-        public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) {
-            NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
-            Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot
-                    .subMap(firstLedgerId, true, lastLedgerId, false);
-
-            return subBkActiveLedgers.keySet();
-        }
-
-        @Override
-        public BKMBeanInfo getJMXBean() {
-            return null;
-        }
-
-        @Override
-        public EntryLogger getEntryLogger() {
-            return null;
-        }
-
-        @Override
-        public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
-        }
-
-        @Override
-        public void flushEntriesLocationsIndex() throws IOException {
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index dae61bc..1e7c9a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -22,9 +22,22 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
 
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.SnapshotMap;
 import org.junit.After;
@@ -42,13 +55,17 @@ import org.slf4j.LoggerFactory;
 public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
     static final Logger LOG = LoggerFactory.getLogger(LedgerManagerTestCase.class);
 
-    LedgerManagerFactory ledgerManagerFactory;
-    LedgerManager ledgerManager = null;
-    LedgerIdGenerator ledgerIdGenerator = null;
-    SnapshotMap<Long, Boolean> activeLedgers = null;
+    protected LedgerManagerFactory ledgerManagerFactory;
+    protected LedgerManager ledgerManager = null;
+    protected LedgerIdGenerator ledgerIdGenerator = null;
+    protected SnapshotMap<Long, Boolean> activeLedgers = null;
 
     public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) {
-        super(0);
+        this(lmFactoryCls, 0);
+    }
+
+    public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls, int numBookies) {
+        super(numBookies);
         activeLedgers = new SnapshotMap<Long, Boolean>();
         baseConf.setLedgerManagerFactoryClass(lmFactoryCls);
     }
@@ -93,4 +110,95 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         super.tearDown();
     }
 
+    public class MockLedgerStorage implements CompactableLedgerStorage {
+
+        @Override
+        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
+                LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException {
+        }
+
+        @Override
+        public void start() {
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException {
+        }
+
+        @Override
+        public boolean ledgerExists(long ledgerId) throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean setFenced(long ledgerId) throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean isFenced(long ledgerId) throws IOException {
+            return false;
+        }
+
+        @Override
+        public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        }
+
+        @Override
+        public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+            return null;
+        }
+
+        @Override
+        public long addEntry(ByteBuffer entry) throws IOException {
+            return 0;
+        }
+
+        @Override
+        public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void flush() throws IOException {
+        }
+
+        @Override
+        public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void deleteLedger(long ledgerId) throws IOException {
+            activeLedgers.remove(ledgerId);
+        }
+
+        @Override
+        public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) {
+            NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
+            Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot
+                    .subMap(firstLedgerId, true, lastLedgerId, false);
+
+            return subBkActiveLedgers.keySet();
+        }
+
+        @Override
+        public BKMBeanInfo getJMXBean() {
+            return null;
+        }
+
+        @Override
+        public EntryLogger getEntryLogger() {
+            return null;
+        }
+
+        @Override
+        public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+        }
+
+        @Override
+        public void flushEntriesLocationsIndex() throws IOException {
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 28de0b2..efb8375 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -224,6 +224,23 @@ public abstract class BookKeeperClusterTestCase {
     }
 
     /**
+     * Get bookie configuration for bookie
+     */
+    public ServerConfiguration getBkConf(BookieSocketAddress addr) throws Exception {
+        int bkIndex = 0;
+        for (BookieServer server : bs) {
+            if (server.getLocalAddress().equals(addr)) {
+                break;
+            }
+            ++bkIndex;
+        }
+        if (bkIndex < bs.size()) {
+            return bsConfs.get(bkIndex);
+        }
+        return null;
+    }
+
+    /**
      * Kill a bookie by its socket address. Also, stops the autorecovery process
      * for the corresponding bookie server, if isAutoRecoveryEnabled is true.
      *


Mime
View raw message