bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reddycha...@apache.org
Subject [bookkeeper] branch master updated: BOOKKEEPER-1028 and BOOKKEEPER-1029
Date Fri, 07 Jul 2017 03:11:35 GMT
This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 29eb420  BOOKKEEPER-1028 and BOOKKEEPER-1029
29eb420 is described below

commit 29eb420f1cca07c1a3a7748ed7fbb8a5bc54f2fa
Author: cguttapalem <cguttapalem@salesforce.com>
AuthorDate: Thu Jul 6 20:11:25 2017 -0700

    BOOKKEEPER-1028 and BOOKKEEPER-1029
    
        BOOKKEEPER-1028: inc/excl opts listunderreplicated
    
        - Introduce including and excluding BookieId options
        for listunderreplicatedLedgers
    
        - But there is limitation that, since replicaslist wont be
        updated in underreplicatedledger zNode there is possibility
        of stale information
    
          ---------------------------------------------------------
    
        BOOKKEEPER-1029: BookieDecommision Workflow
    
        - LostBookieRecoveryDelay config param is stored in ZK
        - if LostBookieRecoveryDelay is reset to same value then it force triggers audit immediately
        - Added logic to trigger immediately or schedule pending audittask depending on the changed value in ZK
        - good number of testcases validating focetrigger/reschedluing audittask
        - added bookieshell command to get/set LostBookieRecoveryDelay from ZK
        - added bookieshell command to triggeraudit by resetting LostBookieRecoveryDelay
        - added decommissionbookie bkshell command, which validates the complete replication of ledgers stored in the bookie
    
    Author: cguttapalem <cguttapalem@salesforce.com>
    Author: Charan Reddy Guttapalem <cguttapalem@salesforce.com>
    
    Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <guosijie@gmail.com>
    
    This closes #127 from reddycharan/listunderreplicatedpredicate and squashes the following commits:
    
    34bacf3c [cguttapalem] BOOKKEEPER-1029: BookieDecommision Workflow
    eb43ec49 [Charan Reddy Guttapalem] BOOKKEEPER-1029: BookieDecommision Workflow
    fcb399df [cguttapalem] BOOKKEEPER-1028: inc/excl opts listunderreplicated
---
 bookkeeper-server/conf/log4j.shell.properties      |   1 +
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 176 +++++++++++-
 .../apache/bookkeeper/client/BookKeeperAdmin.java  | 309 +++++++++++++++++++++
 .../apache/bookkeeper/client/LedgerChecker.java    |   5 +
 .../apache/bookkeeper/client/LedgerMetadata.java   |   2 +-
 .../meta/LedgerUnderreplicationManager.java        |  52 +++-
 .../meta/ZkLedgerUnderreplicationManager.java      | 122 +++++++-
 .../org/apache/bookkeeper/replication/Auditor.java | 144 ++++++++--
 .../replication/ReplicationException.java          |   2 +-
 .../bookkeeper/util/BookKeeperConstants.java       |   1 +
 .../bookkeeper/client/BookKeeperAdminTest.java     | 230 +++++++++++++++
 .../replication/AuditorLedgerCheckerTest.java      | 267 +++++++++++++++++-
 12 files changed, 1262 insertions(+), 49 deletions(-)

diff --git a/bookkeeper-server/conf/log4j.shell.properties b/bookkeeper-server/conf/log4j.shell.properties
index dcdc77c..58d6ea6 100644
--- a/bookkeeper-server/conf/log4j.shell.properties
+++ b/bookkeeper-server/conf/log4j.shell.properties
@@ -39,3 +39,4 @@ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
 log4j.logger.org.apache.zookeeper=ERROR
 log4j.logger.org.apache.bookkeeper=ERROR
 log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
+log4j.logger.org.apache.bookkeeper.client.BookKeeperAdmin=INFO
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 3fc9d25..42d4de4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -44,15 +44,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
@@ -94,7 +95,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractFuture;
 
-
 /**
  * Bookie Shell is to provide utilities for users to administer a bookkeeper cluster.
  */
@@ -126,6 +126,9 @@ public class BookieShell implements Tool {
     static final String CMD_UPDATELEDGER = "updateledgers";
     static final String CMD_DELETELEDGER = "deleteledger";
     static final String CMD_BOOKIEINFO = "bookieinfo";
+    static final String CMD_DECOMMISSIONBOOKIE = "decommissionbookie";
+    static final String CMD_LOSTBOOKIERECOVERYDELAY = "lostbookierecoverydelay"; 
+    static final String CMD_TRIGGERAUDIT = "triggeraudit";
     static final String CMD_HELP = "help";
 
     final ServerConfiguration bkConf = new ServerConfiguration();
@@ -498,6 +501,8 @@ public class BookieShell implements Tool {
 
         public ListUnderreplicatedCmd() {
             super(CMD_LISTUNDERREPLICATED);
+            opts.addOption("missingreplica", true, "Bookie Id of missing replica");
+            opts.addOption("excludingmissingreplica", true, "Bookie Id of missing replica to ignore");
         }
 
         @Override
@@ -507,16 +512,30 @@ public class BookieShell implements Tool {
 
         @Override
         String getDescription() {
-            return "List ledgers marked as underreplicated";
+            return "List ledgers marked as underreplicated, with optional options to specify missingreplica (BookieId) and to exclude missingreplica";
         }
 
         @Override
         String getUsage() {
-            return "listunderreplicated";
+            return "listunderreplicated [[-missingreplica <bookieaddress>] [-excludingmissingreplica <bookieaddress>]]";
         }
 
         @Override
         int runCmd(CommandLine cmdLine) throws Exception {
+
+            final String includingBookieId = cmdLine.getOptionValue("missingreplica");
+            final String excludingBookieId = cmdLine.getOptionValue("excludingmissingreplica");
+
+            Predicate<List<String>> predicate = null;
+            if (!StringUtils.isBlank(includingBookieId) && !StringUtils.isBlank(excludingBookieId)) {
+                predicate = replicasList -> (replicasList.contains(includingBookieId)
+                        && !replicasList.contains(excludingBookieId));
+            } else if (!StringUtils.isBlank(includingBookieId)) {
+                predicate = replicasList -> replicasList.contains(includingBookieId);
+            } else if (!StringUtils.isBlank(excludingBookieId)) {
+                predicate = replicasList -> !replicasList.contains(excludingBookieId);
+            }
+
             ZooKeeper zk = null;
             try {
                 zk = ZooKeeperClient.newBuilder()
@@ -525,7 +544,7 @@ public class BookieShell implements Tool {
                         .build();
                 LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
                 LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
-                Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate();
+                Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate(predicate);
                 while (iter.hasNext()) {
                     System.out.println(iter.next());
                 }
@@ -1327,6 +1346,65 @@ public class BookieShell implements Tool {
     }
 
     /**
+     * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in Zookeeper
+     */
+    class LostBookieRecoveryDelayCmd extends MyCommand {
+        Options opts = new Options();
+
+        public LostBookieRecoveryDelayCmd() {
+            super(CMD_LOSTBOOKIERECOVERYDELAY);
+            opts.addOption("g", "get", false, "Get LostBookieRecoveryDelay value (in seconds)");
+            opts.addOption("s", "set", true, "Set LostBookieRecoveryDelay value (in seconds)");
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Setter and Getter for LostBookieRecoveryDelay value (in seconds) in Zookeeper";
+        }
+
+        @Override
+        String getUsage() {
+            return "lostbookierecoverydelay [-get|-set <value>]";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            boolean getter = cmdLine.hasOption("g");
+            boolean setter = cmdLine.hasOption("s");
+
+            if ((!getter && !setter) || (getter && setter)) {
+                LOG.error("One and only one of -get and -set must be specified");
+                printUsage();
+                return 1;
+            }
+            ClientConfiguration adminConf = new ClientConfiguration(bkConf);
+            BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
+            try {
+                if (getter) {
+                    int lostBookieRecoveryDelay = admin.getLostBookieRecoveryDelay();
+                    LOG.info("LostBookieRecoveryDelay value in ZK: {}", String.valueOf(lostBookieRecoveryDelay));
+                } else {
+                    int lostBookieRecoveryDelay = Integer.parseInt(cmdLine.getOptionValue("set"));
+                    admin.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+                    LOG.info("Successfully set LostBookieRecoveryDelay value in ZK: {}",
+                            String.valueOf(lostBookieRecoveryDelay));
+                }
+            } finally {
+                if (admin != null) {
+                    admin.close();
+                }
+            }
+            return 0;
+        }
+    }
+    
+    
+    /**
      * Print which node has the auditor lock
      */
     class WhoIsAuditorCmd extends MyCommand {
@@ -1825,6 +1903,91 @@ public class BookieShell implements Tool {
     }
 
     /**
+     * Command to trigger AuditTask by resetting lostBookieRecoveryDelay to its current value
+     */
+    class TriggerAuditCmd extends MyCommand {
+        Options opts = new Options();
+
+        TriggerAuditCmd() {
+            super(CMD_TRIGGERAUDIT);
+        }
+
+        @Override
+        String getDescription() {
+            return "Force trigger the Audit by resetting the lostBookieRecoveryDelay";
+        }
+
+        @Override
+        String getUsage() {
+            return CMD_TRIGGERAUDIT;
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            ClientConfiguration adminConf = new ClientConfiguration(bkConf);
+            BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
+            try {
+                admin.triggerAudit();
+            } finally {
+                if (admin != null) {
+                    admin.close();
+                }
+            }
+            return 0;
+        }
+    }
+    
+    /**
+     * Command to trigger AuditTask by resetting lostBookieRecoveryDelay and then make sure the 
+     * ledgers stored in the bookie are properly replicated.
+     */
+    class DecommissionBookieCmd extends MyCommand {
+        Options lOpts = new Options();
+        
+        DecommissionBookieCmd() {
+            super(CMD_DECOMMISSIONBOOKIE);
+        }
+
+        @Override
+        String getDescription() {
+            return "Force trigger the Audittask and make sure all the ledgers stored in the decommissioning bookie are replicated";
+        }
+
+        @Override
+        String getUsage() {
+            return CMD_DECOMMISSIONBOOKIE;
+        }
+
+        @Override
+        Options getOptions() {
+            return lOpts;
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            ClientConfiguration adminConf = new ClientConfiguration(bkConf);
+            BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
+            try {
+                BookieSocketAddress thisBookieAddress = Bookie.getBookieAddress(bkConf);
+                admin.decommissionBookie(thisBookieAddress);
+                return 0;
+            } catch (Exception e) {
+                LOG.error("Received exception in DecommissionBookieCmd ", e);
+                return -1;
+            } finally {
+                if (admin != null) {
+                    admin.close();
+                }
+            }
+        }
+    }
+    
+    /**
      * A facility for reporting update ledger progress.
      */
     public interface UpdateLedgerNotifier {
@@ -1855,7 +2018,10 @@ public class BookieShell implements Tool {
         commands.put(CMD_UPDATELEDGER, new UpdateLedgerCmd());
         commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd());
         commands.put(CMD_BOOKIEINFO, new BookieInfoCmd());
+        commands.put(CMD_DECOMMISSIONBOOKIE, new DecommissionBookieCmd());
         commands.put(CMD_HELP, new HelpCmd());
+        commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new LostBookieRecoveryDelayCmd());  
+        commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 16d39ce..8cf8833 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -35,19 +35,35 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
 
+import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
 import org.apache.bookkeeper.client.BookKeeper.SyncOpenCallback;
 import org.apache.bookkeeper.client.LedgerFragmentReplicator.SingleFragmentCallback;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.replication.AuditorElector;
+import org.apache.bookkeeper.replication.BookieLedgerIndexer;
+import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -64,6 +80,8 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.AbstractFuture;
+
 /**
  * Admin client for BookKeeper clusters
  */
@@ -89,6 +107,14 @@ public class BookKeeperAdmin implements AutoCloseable {
      */
     private Random rand = new Random();
 
+    private LedgerManagerFactory mFactory;
+
+    /*
+     * underreplicationManager is not initialized as part of constructor use its
+     * getter (getUnderreplicationManager) so that it can be lazy-initialized
+     */
+    private LedgerUnderreplicationManager underreplicationManager;
+    
     /**
      * Constructor that takes in a ZooKeeper servers connect string so we know
      * how to connect to ZooKeeper to retrieve information about the BookKeeper
@@ -144,6 +170,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         bkc = new BookKeeper(conf, zk);
         ownsBK = true;
         this.lfr = new LedgerFragmentReplicator(bkc, NullStatsLogger.INSTANCE);
+        this.mFactory = bkc.ledgerManagerFactory;
     }
 
     /**
@@ -162,6 +189,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         ownsZK = false;
         this.bookiesPath = bkc.getConf().getZkAvailableBookiesPath();
         this.lfr = new LedgerFragmentReplicator(bkc, statsLogger);
+        this.mFactory = bkc.ledgerManagerFactory;
     }
 
     public BookKeeperAdmin(final BookKeeper bkc) {
@@ -1064,4 +1092,285 @@ public class BookKeeperAdmin implements AutoCloseable {
     public LedgerMetadata getLedgerMetadata(LedgerHandle lh) {
         return lh.getLedgerMetadata();
     }
+    
+    private LedgerUnderreplicationManager getUnderreplicationManager()
+            throws CompatibilityException, KeeperException, InterruptedException {
+        if (underreplicationManager == null) {
+            underreplicationManager = mFactory.newLedgerUnderreplicationManager();
+        }
+        return underreplicationManager;
+    }
+
+    /**
+     * Setter for LostBookieRecoveryDelay value (in seconds) in Zookeeper
+     * 
+     * @param lostBookieRecoveryDelay
+     *                              lostBookieRecoveryDelay value (in seconds) to set 
+     * @throws CompatibilityException
+     * @throws KeeperException
+     * @throws InterruptedException
+     * @throws UnavailableException
+     */
+    public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay)
+            throws CompatibilityException, KeeperException, InterruptedException, UnavailableException {
+        LedgerUnderreplicationManager urlManager = getUnderreplicationManager();
+        urlManager.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+    }
+
+    /**
+     * returns the current LostBookieRecoveryDelay value (in seconds) in Zookeeper
+     * 
+     * @return
+     *          current lostBookieRecoveryDelay value (in seconds)
+     * @throws CompatibilityException
+     * @throws KeeperException
+     * @throws InterruptedException
+     * @throws UnavailableException
+     */
+    public int getLostBookieRecoveryDelay()
+            throws CompatibilityException, KeeperException, InterruptedException, UnavailableException {
+        LedgerUnderreplicationManager urlManager = getUnderreplicationManager();
+        return urlManager.getLostBookieRecoveryDelay();
+    }
+
+    /**
+     * trigger AuditTask by resetting lostBookieRecoveryDelay to its current
+     * value. If Autorecovery is not enabled or if there is no Auditor then this
+     * method will throw UnavailableException.
+     * 
+     * @throws CompatibilityException
+     * @throws KeeperException
+     * @throws InterruptedException
+     * @throws UnavailableException
+     * @throws IOException
+     */
+    public void triggerAudit()
+            throws CompatibilityException, KeeperException, InterruptedException, UnavailableException, IOException {
+        LedgerUnderreplicationManager urlManager = getUnderreplicationManager();
+        if (!urlManager.isLedgerReplicationEnabled()) {
+            LOG.error("Autorecovery is disabled. So giving up!");
+            throw new UnavailableException("Autorecovery is disabled. So giving up!");
+        }
+        
+        BookieSocketAddress auditorId = AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.conf), zk);
+        if (auditorId == null) {
+            LOG.error("No auditor elected, though Autorecovery is enabled. So giving up.");
+            throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up.");
+        }
+
+        int previousLostBookieRecoveryDelayValue = urlManager.getLostBookieRecoveryDelay();
+        LOG.info("Resetting LostBookieRecoveryDelay value: {}, to kickstart audit task",
+                previousLostBookieRecoveryDelayValue);
+        urlManager.setLostBookieRecoveryDelay(previousLostBookieRecoveryDelayValue);
+    }
+    
+    /**
+     * Triggers AuditTask by resetting lostBookieRecoveryDelay and then make
+     * sure the ledgers stored in the given decommissioning bookie are properly
+     * replicated and they are not underreplicated because of the given bookie.
+     * This method waits untill there are no underreplicatedledgers because of this 
+     * bookie. If the given Bookie is not shutdown yet, then it will throw 
+     * BKIllegalOpException.
+     * 
+     * @param bookieAddress
+     *            address of the decommissioning bookie
+     * @throws CompatibilityException
+     * @throws UnavailableException
+     * @throws KeeperException
+     * @throws InterruptedException
+     * @throws IOException
+     * @throws BKAuditException
+     * @throws TimeoutException
+     * @throws BKException 
+     */
+    public void decommissionBookie(BookieSocketAddress bookieAddress)
+            throws CompatibilityException, UnavailableException, KeeperException, InterruptedException, IOException,
+            BKAuditException, TimeoutException, BKException {
+        if (getAvailableBookies().contains(bookieAddress) || getReadOnlyBookies().contains(bookieAddress)) {
+            LOG.error("Bookie: {} is not shutdown yet", bookieAddress);
+            throw BKException.create(BKException.Code.IllegalOpException);
+        }
+        
+        triggerAudit();
+
+        /*
+         * Sleep for 30 secs, so that Auditor gets chance to trigger its
+         * force audittask and let the underreplicationmanager process
+         * to do its replication process
+         */
+        Thread.sleep(30 * 1000);
+        
+        /*
+         * get the collection of the ledgers which are stored in this
+         * bookie, by making a call to
+         * bookieLedgerIndexer.getBookieToLedgerIndex.
+         */
+        
+        BookieLedgerIndexer bookieLedgerIndexer = new BookieLedgerIndexer(bkc.ledgerManager);
+        Map<String, Set<Long>> bookieToLedgersMap = bookieLedgerIndexer.getBookieToLedgerIndex();
+        Set<Long> ledgersStoredInThisBookie = bookieToLedgersMap.get(bookieAddress.toString());
+        if ((ledgersStoredInThisBookie != null) && (!ledgersStoredInThisBookie.isEmpty())) {
+            /*
+             * wait untill all the ledgers are replicated to other
+             * bookies by making sure that these ledgers metadata don't
+             * contain this bookie as part of their ensemble.
+             */
+            waitForLedgersToBeReplicated(ledgersStoredInThisBookie, bookieAddress, bkc.ledgerManager);
+        }
+
+        // for double-checking, check if any ledgers are listed as underreplicated because of this bookie
+        Predicate<List<String>> predicate = replicasList -> replicasList.contains(bookieAddress.toString());
+        Iterator<Long> urLedgerIterator = underreplicationManager.listLedgersToRereplicate(predicate);
+        if (urLedgerIterator.hasNext()) {
+            //if there are any then wait and make sure those ledgers are replicated properly
+            LOG.info("Still in some underreplicated ledgers metadata, this bookie is part of its ensemble. "
+                    + "Have to make sure that those ledger fragments are rereplicated");
+            List<Long> urLedgers = new ArrayList<>();
+            urLedgerIterator.forEachRemaining(urLedgers::add);
+            waitForLedgersToBeReplicated(urLedgers, bookieAddress, bkc.ledgerManager);
+        }
+    }
+
+    private void waitForLedgersToBeReplicated(Collection<Long> ledgers, BookieSocketAddress thisBookieAddress,
+            LedgerManager ledgerManager) throws InterruptedException, TimeoutException {
+        int maxSleepTimeInBetweenChecks = 10 * 60 * 1000; // 10 minutes
+        int sleepTimePerLedger = 10 * 1000; // 10 secs
+        Predicate<Long> validateBookieIsNotPartOfEnsemble = ledgerId -> !areEntriesOfLedgerStoredInTheBookie(ledgerId,
+                thisBookieAddress, ledgerManager);
+        while (!ledgers.isEmpty()) {
+            LOG.info("Count of Ledgers which need to be rereplicated: {}", ledgers.size());
+            int sleepTimeForThisCheck = ledgers.size() * sleepTimePerLedger > maxSleepTimeInBetweenChecks
+                    ? maxSleepTimeInBetweenChecks : ledgers.size() * sleepTimePerLedger;
+            Thread.sleep(sleepTimeForThisCheck);
+            LOG.debug("Making sure following ledgers replication to be completed: {}", ledgers);
+            ledgers.removeIf(validateBookieIsNotPartOfEnsemble);
+        }
+    }
+
+    private boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
+            LedgerManager ledgerManager) {
+        ReadMetadataCallback cb = new ReadMetadataCallback(ledgerId);
+        ledgerManager.readLedgerMetadata(ledgerId, cb);
+        try {
+            LedgerMetadata ledgerMetadata = cb.get();
+            Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
+            Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
+            ArrayList<BookieSocketAddress> ensemble;
+            int segmentNo = 0;
+            while (ensemblesOfSegmentsIterator.hasNext()) {
+                ensemble = ensemblesOfSegmentsIterator.next();
+                if (ensemble.contains(bookieAddress)) {
+                    if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata, bookieAddress, segmentNo++)) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        } catch (InterruptedException | ExecutionException e) {
+            if (e.getCause() != null
+                    && e.getCause().getClass().equals(BKException.BKNoSuchLedgerExistsException.class)) {
+                LOG.debug("Ledger: {} has been deleted", ledgerId);
+                return false;
+            } else {
+                LOG.error("Got exception while trying to read LedgerMeatadata of " + ledgerId, e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata ledgerMetadata,
+            BookieSocketAddress bookieAddress, int segmentNo) {
+        boolean isLedgerClosed = ledgerMetadata.isClosed();
+        int ensembleSize = ledgerMetadata.getEnsembleSize();
+        int writeQuorumSize = ledgerMetadata.getWriteQuorumSize();
+
+        List<Entry<Long, ArrayList<BookieSocketAddress>>> segments = new LinkedList<Entry<Long, ArrayList<BookieSocketAddress>>>(
+                ledgerMetadata.getEnsembles().entrySet());
+
+        boolean lastSegment = (segmentNo == (segments.size() - 1));
+        
+        /*
+         * Checking the last segment of the ledger can be complicated in
+         * some cases. In the case that the ledger is closed, we can just
+         * check the fragments of the segment as normal, except in the case
+         * that no entry was ever written, to the ledger, in which case we
+         * check no fragments.
+         * 
+         * Following the same approach as in LedgerChecker.checkLedger
+         */
+        if (lastSegment && isLedgerClosed && (ledgerMetadata.getLastEntryId() < segments.get(segmentNo).getKey())) {
+            return false;
+        }
+
+        /*
+         * if ensembleSize is equal to writeQuorumSize, then ofcourse all
+         * the entries of this segment are supposed to be stored in this
+         * bookie. If this is last segment of the ledger and if the ledger
+         * is not closed (this is a corner case), then we have to return
+         * true. For more info. Check BOOKKEEPER-237 and BOOKKEEPER-325.
+         */
+        if ((lastSegment && !isLedgerClosed) || (ensembleSize == writeQuorumSize)) {
+            return true;
+        }
+
+        /*
+         * the following check is required because ensembleSize can be
+         * greater than writeQuorumSize and in this case if there are only
+         * couple of entries then based on RoundRobinDistributionSchedule
+         * there might not be any entry copy in this bookie though this
+         * bookie is part of the ensemble of this segment. If no entry is
+         * stored in this bookie then we should return false, because
+         * ReplicationWorker wont take care of fixing the ledgerMetadata of
+         * this segment in this case.
+         * 
+         * if ensembleSize > writeQuorumSize, then in LedgerFragment.java
+         * firstEntryID may not be equal to firstStoredEntryId lastEntryId
+         * may not be equalto lastStoredEntryId. firstStoredEntryId and
+         * lastStoredEntryId will be LedgerHandle.INVALID_ENTRY_ID, if no
+         * entry of this segment stored in this bookie. In this case
+         * LedgerChecker.verifyLedgerFragment will not consider it as
+         * unavailable/bad fragment though this bookie is part of the
+         * ensemble of the segment and it is down.
+         */
+        DistributionSchedule distributionSchedule = new RoundRobinDistributionSchedule(
+                ledgerMetadata.getWriteQuorumSize(), ledgerMetadata.getAckQuorumSize(),
+                ledgerMetadata.getEnsembleSize());
+        ArrayList<BookieSocketAddress> currentSegmentEnsemble = segments.get(segmentNo).getValue();
+        int thisBookieIndexInCurrentEnsemble = currentSegmentEnsemble.indexOf(bookieAddress);
+        long firstEntryId = segments.get(segmentNo).getKey();
+        long lastEntryId = lastSegment ? ledgerMetadata.getLastEntryId() : segments.get(segmentNo + 1).getKey() - 1;
+        long firstStoredEntryId = LedgerHandle.INVALID_ENTRY_ID;
+        long firstEntryIter = firstEntryId;
+        // following the same approach followed in LedgerFragment.getFirstStoredEntryId()
+        for (int i = 0; i < ensembleSize && firstEntryIter <= lastEntryId; i++) {
+            if (distributionSchedule.hasEntry(firstEntryIter, thisBookieIndexInCurrentEnsemble)) {
+                firstStoredEntryId = firstEntryIter;
+                break;
+            } else {
+                firstEntryIter++;
+            }
+        }
+        return firstStoredEntryId != LedgerHandle.INVALID_ENTRY_ID;
+    }
+
+    static class ReadMetadataCallback extends AbstractFuture<LedgerMetadata>
+            implements GenericCallback<LedgerMetadata> {
+        final long ledgerId;
+
+        ReadMetadataCallback(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
+        long getLedgerId() {
+            return ledgerId;
+        }
+
+        public void operationComplete(int rc, LedgerMetadata result) {
+            if (rc != 0) {
+                setException(BKException.create(rc));
+            } else {
+                set(result);
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 4266c90..7f58131 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -88,6 +88,11 @@ public class LedgerChecker {
         long firstStored = fragment.getFirstStoredEntryId();
         long lastStored = fragment.getLastStoredEntryId();
 
+        // because of this if block, even if the bookie of the fragment is 
+        // down, it considers Fragment is available/not-bad if firstStored
+        // and lastStored are LedgerHandle.INVALID_ENTRY_ID.
+        // So same logic is used in BookieShell.DecommissionBookieCmd.areEntriesOfSegmentStoredInTheBookie
+        // if any change is made here, then the changes should be in BookieShell also
         if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
             if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
                 throw new InvalidFragmentException();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 2641de9..92fc63e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -241,7 +241,7 @@ public class LedgerMetadata {
         state = LedgerMetadataFormat.State.CLOSED;
     }
 
-    void addEnsemble(long startEntryId, ArrayList<BookieSocketAddress> ensemble) {
+    public void addEnsemble(long startEntryId, ArrayList<BookieSocketAddress> ensemble) {
         assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey();
 
         ensembles.put(startEntryId, ensemble);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index d006895..014c1a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -21,6 +21,8 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.ReplicationException;
 
 import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
 
 /**
  * Interface for marking ledgers which need to be rereplicated
@@ -42,11 +44,18 @@ public interface LedgerUnderreplicationManager {
 
     /**
      * Get a list of all the ledgers which have been
-     * marked for rereplication.
-     *
+     * marked for rereplication, filtered by the predicate on the missing replicas list.
+     * 
+     * Missing replicas list of an underreplicated ledger is the list of the bookies which are part of 
+     * the ensemble of this ledger and are currently unavailable/down.
+     * 
+     * If filtering is not needed then it is suggested to pass null for predicate,
+     * otherwise it will read the content of the ZNode to decide on filtering.
+     * 
+     * @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required
      * @return an iterator which returns ledger ids
      */
-    Iterator<Long> listLedgersToRereplicate();
+    Iterator<Long> listLedgersToRereplicate(Predicate<List<String>> predicate);
 
     /**
      * Acquire a underreplicated ledger for rereplication. The ledger
@@ -116,4 +125,41 @@ public interface LedgerUnderreplicationManager {
      */
     void notifyLedgerReplicationEnabled(GenericCallback<Void> cb)
             throws ReplicationException.UnavailableException;
+
+    /**
+     * Creates the zNode for lostBookieRecoveryDelay with the specified value and returns true.
+     * If the node is already existing, then it returns false   
+     * 
+     * @param lostBookieRecoveryDelay
+     * @return
+     *      true if it succeeds in creating zNode for lostBookieRecoveryDelay, false if it is already existing
+     * @throws ReplicationException.UnavailableException
+     */
+    boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay)
+            throws ReplicationException.UnavailableException;
+
+    /**
+     * Setter for the lostBookieRecoveryDelay znode
+     * 
+     * @param lostBookieRecoveryDelay
+     * @throws ReplicationException.UnavailableException
+     */
+    void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws ReplicationException.UnavailableException;
+
+    /**
+     * Getter for the lostBookieRecoveryDelay
+     * 
+     * @return the int value of lostBookieRecoveryDelay
+     * @throws ReplicationException.UnavailableException
+     */
+    int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableException;
+
+    /**
+     * Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed
+     * 
+     * @param cb
+     * @throws ReplicationException.UnavailableException
+     */
+    void notifyLostBookieRecoveryDelayChanged(GenericCallback<Void> cb)
+            throws ReplicationException.UnavailableException;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index e9d48f3..e56ee30 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -49,7 +50,9 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +60,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.protobuf.TextFormat;
-import org.apache.zookeeper.data.ACL;
 
 /**
  * ZooKeeper implementation of underreplication manager.
@@ -101,6 +103,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     private final String urLockPath;
     private final String layoutZNode;
     private final AbstractConfiguration conf;
+    private final String lostBookieRecoveryDelayZnode;
     private final ZooKeeper zkc;
     private final SubTreeCache subTreeCache;
 
@@ -112,7 +115,8 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         urLedgerPath = basePath
                 + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
         urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
-
+        lostBookieRecoveryDelayZnode = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
+        
         idExtractionPattern = Pattern.compile("urL(\\d+)$");
         this.zkc = zkc;
         this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() {
@@ -341,8 +345,21 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         }
     }
 
+    /**
+     * Get a list of all the ledgers which have been
+     * marked for rereplication, filtered by the predicate on the replicas list.
+     * 
+     * Replicas list of an underreplicated ledger is the list of the bookies which are part of 
+     * the ensemble of this ledger and are currently unavailable/down.
+     * 
+     * If filtering is not needed then it is suggested to pass null for predicate,
+     * otherwise it will read the content of the ZNode to decide on filtering.
+     * 
+     * @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required.
+     * @return an iterator which returns ledger ids
+     */
     @Override
-    public Iterator<Long> listLedgersToRereplicate() {
+    public Iterator<Long> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
         final Queue<String> queue = new LinkedList<String>();
         queue.add(urLedgerPath);
 
@@ -363,12 +380,20 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                 while (queue.size() > 0 && curBatch.size() == 0) {
                     String parent = queue.remove();
                     try {
-                        for (String c : zkc.getChildren(parent,false)) {
-                            String child = parent + "/" + c;
-                            if (c.startsWith("urL")) {
-                                curBatch.add(getLedgerId(child));
-                            } else {
-                                queue.add(child);
+                        for (String c : zkc.getChildren(parent, false)) {
+                            try {
+                                String child = parent + "/" + c;
+                                if (c.startsWith("urL")) {
+                                    long ledgerId = getLedgerId(child);
+                                    if ((predicate == null)
+                                            || predicate.test(getLedgerUnreplicationInfo(ledgerId).getReplicaList())) {
+                                        curBatch.add(ledgerId);
+                                    }
+                                } else {
+                                    queue.add(child);
+                                }
+                            } catch (KeeperException.NoNodeException nne) {
+                                // ignore
                             }
                         }
                     } catch (InterruptedException ie) {
@@ -694,4 +719,83 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1);
         }
     }
+
+    @Override
+    public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws UnavailableException {
+        LOG.debug("initializeLostBookieRecoveryDelay()");
+        try {
+            zkc.create(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException.NodeExistsException ke) {
+            LOG.info(
+                    "lostBookieRecoveryDelay Znode is already present, so using existing lostBookieRecoveryDelay Znode value");
+            return false;
+        } catch (KeeperException ke) {
+            LOG.error("Error while initializing LostBookieRecoveryDelay", ke);
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        }
+        return true;
+    }
+
+    @Override
+    public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws UnavailableException {
+        LOG.debug("setLostBookieRecoveryDelay()");
+        try {
+            if (zkc.exists(lostBookieRecoveryDelayZnode, false) != null) {
+                zkc.setData(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
+                        -1);
+            } else {
+                zkc.create(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
+                        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+        } catch (KeeperException ke) {
+            LOG.error("Error while setting LostBookieRecoveryDelay ", ke);
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public int getLostBookieRecoveryDelay() throws UnavailableException {
+        LOG.debug("getLostBookieRecoveryDelay()");
+        try {
+            byte[] data = zkc.getData(lostBookieRecoveryDelayZnode, false, null);
+            return Integer.parseInt(new String(data, UTF_8));
+        } catch (KeeperException ke) {
+            LOG.error("Error while getting LostBookieRecoveryDelay ", ke);
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public void notifyLostBookieRecoveryDelayChanged(GenericCallback<Void> cb) throws UnavailableException {
+        LOG.debug("notifyLostBookieRecoveryDelayChanged()");
+        Watcher w = new Watcher() {
+            public void process(WatchedEvent e) {
+                if (e.getType() == Watcher.Event.EventType.NodeDataChanged) {
+                    cb.operationComplete(0, null);
+                }
+            }
+        };
+        try {
+            if (null == zkc.exists(lostBookieRecoveryDelayZnode, w)) {
+                cb.operationComplete(0, null);
+                return;
+            }
+        } catch (KeeperException ke) {
+            LOG.error("Error while checking the state of lostBookieRecoveryDelay", ke);
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 3df941f..054e09d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -20,8 +20,19 @@
  */
 package org.apache.bookkeeper.replication;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -41,35 +52,21 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
-import org.apache.bookkeeper.replication.ReplicationStats;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.zookeeper.AsyncCallback;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.SettableFuture;
-
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * Auditor is a single entity in the entire Bookie cluster and will be watching
@@ -101,7 +98,8 @@ public class Auditor implements BookiesListener {
     private final Counter numDelayedBookieAuditsCancelled;
     private volatile Future<?> auditTask;
     private Set<String> bookiesToBeAudited = Sets.newHashSet();
-
+    private volatile int lostBookieRecoveryDelayBeforeChange;
+    
     public Auditor(final String bookieIdentifier, ServerConfiguration conf,
                    ZooKeeper zkc, StatsLogger statsLogger) throws UnavailableException {
         this.conf = conf;
@@ -149,6 +147,15 @@ public class Auditor implements BookiesListener {
             LOG.info("AuthProvider used by the Auditor is "+clientConfiguration.getClientAuthProviderFactoryClass());
             this.bkc = new BookKeeper(clientConfiguration, zkc);
             this.admin = new BookKeeperAdmin(bkc, statsLogger);
+            if (this.ledgerUnderreplicationManager
+                    .initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) {
+                LOG.info("Initializing lostBookieRecoveryDelay zNode to the conif value: {}",
+                        conf.getLostBookieRecoveryDelay());
+            } else {
+                LOG.info(
+                        "Valid lostBookieRecoveryDelay zNode is available, so not creating lostBookieRecoveryDelay zNode as part of Auditor initialization ");
+            }
+            lostBookieRecoveryDelayBeforeChange = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
         } catch (CompatibilityException ce) {
             throw new UnavailableException(
                     "CompatibilityException while initializing Auditor", ce);
@@ -191,7 +198,8 @@ public class Auditor implements BookiesListener {
                 public void run() {
                     try {
                         waitIfLedgerReplicationDisabled();
-
+                        int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager
+                                .getLostBookieRecoveryDelay();
                         List<String> availableBookies = getAvailableBookies();
 
                         // casting to String, as knownBookies and availableBookies
@@ -220,7 +228,7 @@ public class Auditor implements BookiesListener {
                         }
 
                         knownBookies.removeAll(bookiesToBeAudited);
-                        if (conf.getLostBookieRecoveryDelay() == 0) {
+                        if (lostBookieRecoveryDelay == 0) {
                             startAudit(false);
                             bookiesToBeAudited.clear();
                             return;
@@ -246,9 +254,9 @@ public class Auditor implements BookiesListener {
                                     auditTask = null;
                                     bookiesToBeAudited.clear();
                                 }
-                            }, conf.getLostBookieRecoveryDelay(), TimeUnit.SECONDS);
+                            }, lostBookieRecoveryDelay, TimeUnit.SECONDS);
                             numBookieAuditsDelayed.inc();
-                            LOG.info("Delaying bookie audit by " + conf.getLostBookieRecoveryDelay()
+                            LOG.info("Delaying bookie audit by " + lostBookieRecoveryDelay
                                      + "secs for " + bookiesToBeAudited.toString());
                         }
                     } catch (BKException bke) {
@@ -263,6 +271,64 @@ public class Auditor implements BookiesListener {
             });
     }
 
+    synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
+        if (executor.isShutdown()) {
+            SettableFuture<Void> f = SettableFuture.<Void> create();
+            f.setException(new BKAuditException("Auditor shutting down"));
+            return f;
+        }
+        return executor.submit(new Runnable() {
+            int lostBookieRecoveryDelay = -1;
+            public void run() {
+                try {
+                    waitIfLedgerReplicationDisabled();
+                    lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager
+                            .getLostBookieRecoveryDelay();
+                    // if there is pending auditTask, cancel the task. So that it can be rescheduled
+                    // after new lostBookieRecoveryDelay period
+                    if (auditTask != null) {
+                        LOG.info("lostBookieRecoveryDelay period has been changed so canceling the pending AuditTask");
+                        auditTask.cancel(false);                        
+                        numDelayedBookieAuditsCancelled.inc();
+                    }
+
+                    // if lostBookieRecoveryDelay is set to its previous value then consider it as
+                    // signal to trigger the Audit immediately.
+                    if ((lostBookieRecoveryDelay == 0)
+                            || (lostBookieRecoveryDelay == lostBookieRecoveryDelayBeforeChange)) {
+                        LOG.info(
+                                "lostBookieRecoveryDelay has been set to 0 or reset to its previos value, so starting AuditTask. "
+                                + "Current lostBookieRecoveryDelay: {}, previous lostBookieRecoveryDelay: {}",
+                                lostBookieRecoveryDelay, lostBookieRecoveryDelayBeforeChange);
+                        startAudit(false);
+                        auditTask = null;
+                        bookiesToBeAudited.clear();                        
+                    } else if (auditTask != null) {
+                        LOG.info("lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly",
+                                lostBookieRecoveryDelay);
+                        auditTask = executor.schedule(new Runnable() {
+                            public void run() {
+                                startAudit(false);
+                                auditTask = null;
+                                bookiesToBeAudited.clear();
+                            }
+                        }, lostBookieRecoveryDelay, TimeUnit.SECONDS);
+                        numBookieAuditsDelayed.inc();
+                    }                    
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    LOG.error("Interrupted while for LedgersReplication to be enabled ", ie);
+                } catch (UnavailableException ue) {
+                    LOG.error("Exception while reading from ZK", ue);
+                } finally{
+                    if (lostBookieRecoveryDelay != -1) {
+                        lostBookieRecoveryDelayBeforeChange = lostBookieRecoveryDelay;
+                    }
+                }
+            }
+        });
+    }
+
     public void start() {
         LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier);
         // on startup watching available bookie and based on the
@@ -317,6 +383,14 @@ public class Auditor implements BookiesListener {
                 LOG.error("Couldn't get bookie list, exiting", bke);
                 submitShutdownTask();
             }
+            
+            try {
+                this.ledgerUnderreplicationManager
+                        .notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
+            } catch (UnavailableException ue) {
+                LOG.error("Exception while registering for LostBookieRecoveryDelay change notification", ue);
+                submitShutdownTask();
+            }
 
             long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval();
             if (bookieCheckInterval == 0) {
@@ -330,6 +404,19 @@ public class Auditor implements BookiesListener {
         }
     }
 
+    private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
+        @Override
+        public void operationComplete(int rc, Void result) {
+            try {
+                Auditor.this.ledgerUnderreplicationManager
+                        .notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this);
+            } catch (UnavailableException ae) {
+                LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", ae);
+            }
+            Auditor.this.submitLostBookieRecoveryDelayChangedEvent();
+        }
+    }
+    
     private void waitIfLedgerReplicationDisabled() throws UnavailableException,
             InterruptedException {
         ReplicationEnableCb cb = new ReplicationEnableCb();
@@ -673,4 +760,11 @@ public class Auditor implements BookiesListener {
             }
         };
 
+    int getLostBookieRecoveryDelayBeforeChange() {
+        return lostBookieRecoveryDelayBeforeChange;
+    }
+
+    Future<?> getAuditTask() {
+        return auditTask;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
index 32a03e2..2ff7b94 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
@@ -64,7 +64,7 @@ public abstract class ReplicationException extends Exception {
     /**
      * Exception while auditing bookie-ledgers
     */
-    static class BKAuditException extends ReplicationException {
+    public static class BKAuditException extends ReplicationException {
         private static final long serialVersionUID = 95551905L;
 
         BKAuditException(String message, Throwable cause) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index bae7715..362e1e6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -40,6 +40,7 @@ public class BookKeeperConstants {
     public static final String UNDER_REPLICATION_NODE = "underreplication";
     public static final String UNDER_REPLICATION_LOCK = "locks";
     public static final String DISABLE_NODE = "disable";
+    public static final String LOSTBOOKIERECOVERYDELAY_NODE = "lostBookieRecoveryDelay";
     public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers";
     public static final String LAYOUT_ZNODE = "LAYOUT";
     public static final String INSTANCEID = "INSTANCEID";
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
new file mode 100644
index 0000000..b197353
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -0,0 +1,230 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.BKException.BKIllegalOpException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BookKeeperAdminTest.class);
+    private DigestType digestType = DigestType.CRC32;
+    private static final String PASSWORD = "testPasswd";
+    private static final int numOfBookies = 6;
+    private final int lostBookieRecoveryDelayInitValue = 1800;
+
+    public BookKeeperAdminTest() {
+        super(numOfBookies);
+        baseConf.setAutoRecoveryDaemonEnabled(true);
+        baseConf.setLostBookieRecoveryDelay(lostBookieRecoveryDelayInitValue);
+        baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30000));
+    }
+
+    @Test(timeout = 60000)
+    public void testLostBookieRecoveryDelayValue() throws Exception {
+        BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+        assertEquals("LostBookieRecoveryDelay", lostBookieRecoveryDelayInitValue, bkAdmin.getLostBookieRecoveryDelay());
+        int newLostBookieRecoveryDelayValue = 2400;
+        bkAdmin.setLostBookieRecoveryDelay(newLostBookieRecoveryDelayValue);
+        assertEquals("LostBookieRecoveryDelay", newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay());
+        assertEquals("LostBookieRecoveryDelay", newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay());
+        newLostBookieRecoveryDelayValue = 3000;
+        bkAdmin.setLostBookieRecoveryDelay(newLostBookieRecoveryDelayValue);
+        assertEquals("LostBookieRecoveryDelay", newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay());
+        bkAdmin.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testTriggerAudit() throws Exception {
+        ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+        BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+        int lostBookieRecoveryDelayValue = bkAdmin.getLostBookieRecoveryDelay();
+        urLedgerMgr.disableLedgerReplication();
+        try {
+            bkAdmin.triggerAudit();
+            Assert.fail("Trigger Audit should have failed because LedgerReplication is disabled");
+        } catch (UnavailableException une) {
+            // expected
+        }
+        assertEquals("LostBookieRecoveryDelay", lostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay());
+        urLedgerMgr.enableLedgerReplication();
+        bkAdmin.triggerAudit();
+        assertEquals("LostBookieRecoveryDelay", lostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay());
+        long ledgerId = 1L;
+        LedgerHandle ledgerHandle = bkc.createLedgerAdv(ledgerId, numBookies, numBookies, numBookies, digestType,
+                PASSWORD.getBytes(), null);
+        ledgerHandle.addEntry(0, "data".getBytes());
+        ledgerHandle.close();
+
+        killBookie(1);
+        /*
+         * since lostBookieRecoveryDelay is set, when a bookie is died, it will
+         * not start Audit process immediately. But when triggerAudit is called
+         * it will force audit process.
+         */
+        bkAdmin.triggerAudit();
+        Thread.sleep(500);
+        Iterator<Long> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
+        assertTrue("There are supposed to be underreplicatedledgers", ledgersToRereplicate.hasNext());
+        assertEquals("Underreplicated ledgerId", ledgerId, ledgersToRereplicate.next().longValue());
+        bkAdmin.close();
+    }
+
+    @Test(timeout = 480000)
+    public void testDecommissionBookie() throws Exception {
+        ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+        BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+
+        int numOfLedgers = 2 * numOfBookies;
+        int numOfEntries = 2 * numOfBookies;
+        for (int i = 0; i < numOfLedgers; i++) {
+            LedgerHandle lh = bkc.createLedger(3, 2, digestType, PASSWORD.getBytes());
+            for (int j = 0; j < numOfEntries; j++) {
+                lh.addEntry("entry".getBytes());
+            }
+            lh.close();
+        }
+        /*
+         * create ledgers having empty segments (segment with no entries)
+         */
+        for (int i = 0; i < numOfLedgers; i++) {
+            LedgerHandle emptylh = bkc.createLedger(3, 2, digestType, PASSWORD.getBytes());
+            emptylh.close();
+        }
+        
+        try {
+            /*
+             * if we try to call decommissionBookie for a bookie which is not
+             * shutdown, then it should throw BKIllegalOpException
+             */
+            bkAdmin.decommissionBookie(bs.get(0).getLocalAddress());
+            fail("Expected BKIllegalOpException because that bookie is not shutdown yet");
+        } catch (BKIllegalOpException bkioexc) {
+            // expected IllegalException
+        }
+        
+        ServerConfiguration killedBookieConf = killBookie(1);
+        /*
+         * this decommisionBookie should make sure that there are no
+         * underreplicated ledgers because of this bookie
+         */
+        bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
+        bkAdmin.triggerAudit();
+        Thread.sleep(500);
+        Iterator<Long> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
+        if (ledgersToRereplicate.hasNext()) {
+            while (ledgersToRereplicate.hasNext()) {
+                Long ledgerId = ledgersToRereplicate.next();
+                LOG.error("Ledger: {} is underreplicated which is not expected", ledgerId);
+            }
+            fail("There are not supposed to be any underreplicatedledgers");
+        }
+        
+        killedBookieConf = killBookie(0);
+        bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
+        bkAdmin.triggerAudit();
+        Thread.sleep(500);
+        ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
+        if (ledgersToRereplicate.hasNext()) {
+            while (ledgersToRereplicate.hasNext()) {
+                Long ledgerId = ledgersToRereplicate.next();
+                LOG.error("Ledger: {} is underreplicated which is not expected", ledgerId);
+            }
+            fail("There are not supposed to be any underreplicatedledgers");
+        }
+        bkAdmin.close();
+    }
+
+    @Test(timeout = 240000)
+    public void testDecommissionForLedgersWithMultipleSegmentsAndNotWriteClosed() throws Exception {
+        ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+        BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+        int numOfEntries = 2 * numOfBookies;
+
+        LedgerHandle lh1 = bkc.createLedgerAdv(1L, numBookies, 3, 3, digestType, PASSWORD.getBytes(), null);
+        LedgerHandle lh2 = bkc.createLedgerAdv(2L, numBookies, 3, 3, digestType, PASSWORD.getBytes(), null);
+        LedgerHandle lh3 = bkc.createLedgerAdv(3L, numBookies, 3, 3, digestType, PASSWORD.getBytes(), null);
+        LedgerHandle lh4 = bkc.createLedgerAdv(4L, numBookies, 3, 3, digestType, PASSWORD.getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            lh1.addEntry(j, "data".getBytes());
+            lh2.addEntry(j, "data".getBytes());
+            lh3.addEntry(j, "data".getBytes());
+            lh4.addEntry(j, "data".getBytes());
+        }
+
+        startNewBookie();
+
+        assertEquals("Number of Available Bookies", numOfBookies + 1, bkAdmin.getAvailableBookies().size());
+
+        ServerConfiguration killedBookieConf = killBookie(0);
+
+        /*
+         * since one of the bookie is killed, ensemble change happens when next
+         * write is made.So new segment will be created for those 2 ledgers.
+         */
+        for (int j = numOfEntries; j < 2 * numOfEntries; j++) {
+            lh1.addEntry(j, "data".getBytes());
+            lh2.addEntry(j, "data".getBytes());
+        }
+        
+        /*
+         * Here lh1 and lh2 have multiple segments and are writeclosed. But lh3 and lh4 are 
+         * not writeclosed and contains only one segment.
+         */
+        lh1.close();
+        lh2.close();
+        
+        /*
+         * If the last segment of the ledger is underreplicated and if the
+         * ledger is not closed then it will remain underreplicated for
+         * openLedgerRereplicationGracePeriod (by default 30 secs). For more
+         * info. Check BOOKKEEPER-237 and BOOKKEEPER-325. But later
+         * ReplicationWorker will fence the ledger.
+         */
+        bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
+        bkAdmin.triggerAudit();
+        Thread.sleep(500);
+        Iterator<Long> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
+        if (ledgersToRereplicate.hasNext()) {
+            while (ledgersToRereplicate.hasNext()) {
+                Long ledgerId = ledgersToRereplicate.next();
+                LOG.error("Ledger: {} is underreplicated which is not expected", ledgerId);
+            }
+            fail("There are not supposed to be any underreplicatedledgers");
+        }
+        bkAdmin.close();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index 103f1ce..f2a7316 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -20,6 +20,11 @@
  */
 package org.apache.bookkeeper.replication;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -33,6 +38,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -40,13 +46,19 @@ import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -56,8 +68,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
 /**
  * Tests publishing of under replicated ledgers by the Auditor bookie node when
  * corresponding bookies identifes as not running
@@ -337,7 +347,7 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase {
                 .size());
 
         // wait for 5 seconds before starting the recovery work when a bookie fails
-        baseConf.setLostBookieRecoveryDelay(5);
+        urLedgerMgr.setLostBookieRecoveryDelay(5);
 
         // shutdown a non auditor bookie; choosing non-auditor to avoid another election
         String shutdownBookie = shutDownNonAuditorBookie();
@@ -395,6 +405,247 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase {
         _testDelayedAuditOfLostBookies();
     }
 
+    @Test(timeout=60000)
+    public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws Exception {
+     // wait for a second so that the initial periodic check finishes
+        Thread.sleep(1000);
+        
+        LedgerHandle lh1 = createAndAddEntriesToLedger();
+        Long ledgerId = lh1.getId();
+        LOG.debug("Created ledger : " + ledgerId);
+        ledgerList.add(ledgerId);
+        lh1.close();
+
+        final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
+                .size());
+
+        // wait for 50 seconds before starting the recovery work when a bookie fails
+        urLedgerMgr.setLostBookieRecoveryDelay(50);
+
+        // shutdown a non auditor bookie; choosing non-auditor to avoid another election
+        String shutdownBookie = shutDownNonAuditorBookie();
+
+        LOG.debug("Waiting for ledgers to be marked as under replicated");
+        assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(4, TimeUnit.SECONDS));
+        assertEquals("under replicated ledgers identified when it was not expected", 0,
+                urLedgerList.size());
+
+        // set lostBookieRecoveryDelay to 0, so that it triggers AuditTask immediately
+        urLedgerMgr.setLostBookieRecoveryDelay(0);
+        
+        // wait for 1 second for the ledger to get reported as under replicated
+        assertTrue("audit of lost bookie isn't delayed", underReplicaLatch.await(1, TimeUnit.SECONDS));
+
+        assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
+                urLedgerList.contains(ledgerId));
+        Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
+        String data = urLedgerData.get(ledgerId);
+        assertTrue("Bookie " + shutdownBookie
+                + "is not listed in the ledger as missing replica :" + data,
+                data.contains(shutdownBookie));
+    }
+    
+    @Test(timeout=60000)
+    public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Exception {
+     // wait for a second so that the initial periodic check finishes
+        Thread.sleep(1000);
+        
+        LedgerHandle lh1 = createAndAddEntriesToLedger();
+        Long ledgerId = lh1.getId();
+        LOG.debug("Created ledger : " + ledgerId);
+        ledgerList.add(ledgerId);
+        lh1.close();
+
+        final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
+                .size());
+
+        // wait for 3 seconds before starting the recovery work when a bookie fails
+        urLedgerMgr.setLostBookieRecoveryDelay(3);
+
+        // shutdown a non auditor bookie; choosing non-auditor to avoid another election
+        String shutdownBookie = shutDownNonAuditorBookie();
+
+        LOG.debug("Waiting for ledgers to be marked as under replicated");
+        assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
+        assertEquals("under replicated ledgers identified when it was not expected", 0,
+                urLedgerList.size());
+        
+        // set lostBookieRecoveryDelay to 4, so the pending AuditTask is resheduled
+        urLedgerMgr.setLostBookieRecoveryDelay(4);
+        
+        // since we changed the BookieRecoveryDelay period to 4, the audittask shouldn't have been executed
+        LOG.debug("Waiting for ledgers to be marked as under replicated");
+        assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
+        assertEquals("under replicated ledgers identified when it was not expected", 0,
+                urLedgerList.size());        
+        
+        // wait for 3 seconds (since we already waited for 2 secs) for the ledger to get reported as under replicated
+        assertTrue("audit of lost bookie isn't delayed", underReplicaLatch.await(3, TimeUnit.SECONDS));
+        assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
+                urLedgerList.contains(ledgerId));
+        Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
+        String data = urLedgerData.get(ledgerId);
+        assertTrue("Bookie " + shutdownBookie
+                + "is not listed in the ledger as missing replica :" + data,
+                data.contains(shutdownBookie));
+    }
+    
+    @Test(timeout=60000)
+    public void testTriggerAuditorWithNoPendingAuditTask() throws Exception {
+        // wait for a second so that the initial periodic check finishes
+        Thread.sleep(1000);
+        int lostBookieRecoveryDelayConfValue = baseConf.getLostBookieRecoveryDelay();
+        Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
+        Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
+        int lostBookieRecoveryDelayBeforeChange = auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange();
+        Assert.assertEquals("auditTask is supposed to be null", null, auditTask);
+        Assert.assertEquals(
+                "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to BaseConf's lostBookieRecoveryDelay",
+                lostBookieRecoveryDelayConfValue, lostBookieRecoveryDelayBeforeChange);
+        
+        // there is no easy way to validate if the Auditor has executed Audit process (Auditor.startAudit), 
+        // without shuttingdown Bookie. To test if by resetting LostBookieRecoveryDelay it does Auditing
+        // even when there is no pending AuditTask, following approach is needed.
+        
+        // Here we are creating few ledgers ledgermetadata with non-existing bookies as its ensemble.
+        // When Auditor does audit it recognizes these ledgers as underreplicated and mark them as 
+        // under-replicated, since these bookies are not available. 
+        int numofledgers = 5;
+        Random rand = new Random();
+        for (int i = 0; i < numofledgers; i++) {
+            LedgerMetadata metadata = new LedgerMetadata(3, 2, 2, DigestType.CRC32, "passwd".getBytes(), null);
+            ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
+            ensemble.add(new BookieSocketAddress("99.99.99.99:9999"));
+            ensemble.add(new BookieSocketAddress("11.11.11.11:1111"));
+            ensemble.add(new BookieSocketAddress("88.88.88.88:8888"));
+            metadata.addEnsemble(0, ensemble);
+            LedgerManager ledgerManager = LedgerManagerFactory.newLedgerManagerFactory(baseClientConf, zkc)
+                    .newLedgerManager();
+            MutableInt ledgerCreateRC = new MutableInt(-1);
+            CountDownLatch latch = new CountDownLatch(1);
+            long ledgerId = (Math.abs(rand.nextLong())) % 100000000;
+            ledgerManager.createLedgerMetadata(ledgerId, metadata,
+                    new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+                        @Override
+                        public void operationComplete(int rc, Void result) {
+                            ledgerCreateRC.setValue(rc);
+                            latch.countDown();
+                        }
+                    });
+            Assert.assertTrue("Ledger creation should complete within 2 secs",
+                    latch.await(2000, TimeUnit.MILLISECONDS));
+            Assert.assertEquals("LedgerCreate should succeed and return OK rc value", BKException.Code.OK,
+                    ledgerCreateRC.getValue());
+            ledgerList.add(ledgerId);
+        }
+        
+        final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size());
+        urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelayBeforeChange);
+        assertTrue("Audit should be triggered and created ledgers should be marked as underreplicated",
+                underReplicaLatch.await(2, TimeUnit.SECONDS));
+        assertEquals("All the ledgers should be marked as underreplicated", ledgerList.size(), urLedgerList.size());
+        
+        auditTask = auditorBookiesAuditor.getAuditTask();
+        Assert.assertEquals("auditTask is supposed to be null", null, auditTask);
+        Assert.assertEquals(
+                "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to BaseConf's lostBookieRecoveryDelay",
+                lostBookieRecoveryDelayBeforeChange, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+    }
+    
+    @Test(timeout=60000)
+    public void testTriggerAuditorWithPendingAuditTask() throws Exception {
+     // wait for a second so that the initial periodic check finishes
+        Thread.sleep(1000);
+        
+        Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
+        LedgerHandle lh1 = createAndAddEntriesToLedger();
+        Long ledgerId = lh1.getId();
+        LOG.debug("Created ledger : " + ledgerId);
+        ledgerList.add(ledgerId);
+        lh1.close();
+
+        final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
+                .size());
+
+        int lostBookieRecoveryDelay = 5;
+        // wait for 5 seconds before starting the recovery work when a bookie fails
+        urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+
+        // shutdown a non auditor bookie; choosing non-auditor to avoid another election
+        String shutdownBookie = shutDownNonAuditorBookie();
+
+        LOG.debug("Waiting for ledgers to be marked as under replicated");
+        assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
+        assertEquals("under replicated ledgers identified when it was not expected", 0,
+                urLedgerList.size());
+        
+        Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
+        Assert.assertNotEquals("auditTask is not supposed to be null", null, auditTask);
+        Assert.assertEquals(
+                "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set",
+                lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+        
+        // set lostBookieRecoveryDelay to 5 (previous value), so that Auditor is triggered immediately 
+        urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+        assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
+        assertEquals("all under replicated ledgers should be identified", ledgerList.size(),
+                urLedgerList.size());        
+        
+        Thread.sleep(100);
+        auditTask = auditorBookiesAuditor.getAuditTask();
+        Assert.assertEquals("auditTask is supposed to be null", null, auditTask);
+        Assert.assertEquals(
+                "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value",
+                lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+    }
+    
+    @Test(timeout=60000)
+    public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws Exception {
+     // wait for a second so that the initial periodic check finishes
+        Thread.sleep(1000);
+        
+        Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
+        LedgerHandle lh1 = createAndAddEntriesToLedger();
+        Long ledgerId = lh1.getId();
+        LOG.debug("Created ledger : " + ledgerId);
+        ledgerList.add(ledgerId);
+        lh1.close();
+
+        final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
+                .size());
+
+        int lostBookieRecoveryDelay = 5;
+        // wait for 5 seconds before starting the recovery work when a bookie fails
+        urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+
+        // shutdown a non auditor bookie; choosing non-auditor to avoid another election
+        String shutdownBookie = shutDownNonAuditorBookie();
+
+        LOG.debug("Waiting for ledgers to be marked as under replicated");
+        assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
+        assertEquals("under replicated ledgers identified when it was not expected", 0,
+                urLedgerList.size());
+        
+        Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
+        Assert.assertNotEquals("auditTask is not supposed to be null", null, auditTask);
+        Assert.assertEquals(
+                "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set",
+                lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+        
+        // set lostBookieRecoveryDelay to 0, so that Auditor is triggered immediately 
+        urLedgerMgr.setLostBookieRecoveryDelay(0);
+        assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(1, TimeUnit.SECONDS));
+        assertEquals("all under replicated ledgers should be identified", ledgerList.size(),
+                urLedgerList.size());        
+        
+        Thread.sleep(100);
+        auditTask = auditorBookiesAuditor.getAuditTask();
+        Assert.assertEquals("auditTask is supposed to be null", null, auditTask);
+        Assert.assertEquals(
+                "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value",
+                0, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+    }
+    
     /**
      * Test audit of bookies is delayed when one bookie is down. But when
      * another one goes down, the audit is started immediately.
@@ -414,7 +665,7 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase {
         CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size());
 
         // wait for 10 seconds before starting the recovery work when a bookie fails
-        baseConf.setLostBookieRecoveryDelay(10);
+        urLedgerMgr.setLostBookieRecoveryDelay(10);
 
         // shutdown a non auditor bookie to avoid an election
         String shutdownBookie1 = shutDownNonAuditorBookie();
@@ -467,7 +718,7 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase {
         CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size());
 
         // wait for 5 seconds before starting the recovery work when a bookie fails
-        baseConf.setLostBookieRecoveryDelay(5);
+        urLedgerMgr.setLostBookieRecoveryDelay(5);
 
         // shutdown a non auditor bookie to avoid an election
         int idx1 = getShutDownNonAuditorBookieIdx("");
@@ -647,6 +898,12 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase {
         return auditors.get(0);
     }
 
+    private Auditor getAuditorBookiesAuditor() throws Exception {
+        BookieServer auditorBookieServer = getAuditorBookie();
+        String bookieAddr = auditorBookieServer.getLocalAddress().toString();
+        return auditorElectors.get(bookieAddr).auditor;
+    }
+
     private String  shutDownNonAuditorBookie() throws Exception {
         // shutdown bookie which is not an auditor
         int indexOf = bs.indexOf(getAuditorBookie());

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message