Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A1B4D200CCC for ; Fri, 7 Jul 2017 05:11:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9FED0168028; Fri, 7 Jul 2017 03:11:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9B42C16802A for ; Fri, 7 Jul 2017 05:11:41 +0200 (CEST) Received: (qmail 95841 invoked by uid 500); 7 Jul 2017 03:11:39 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 95002 invoked by uid 99); 7 Jul 2017 03:11:39 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Jul 2017 03:11:39 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8C82587573; Fri, 7 Jul 2017 03:11:35 +0000 (UTC) Date: Fri, 07 Jul 2017 03:11:35 +0000 To: "commits@bookkeeper.apache.org" Subject: [bookkeeper] branch master updated: BOOKKEEPER-1028 and BOOKKEEPER-1029 MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <149939709571.24393.10749142738845781962@gitbox.apache.org> From: reddycharan@apache.org Reply-To: "commits@bookkeeper.apache.org" X-Git-Host: gitbox.apache.org X-Git-Repo: bookkeeper X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: b59d63e8be237e80872f73d92a31b4213d9a649b X-Git-Newrev: 29eb420f1cca07c1a3a7748ed7fbb8a5bc54f2fa X-Git-Rev: 29eb420f1cca07c1a3a7748ed7fbb8a5bc54f2fa X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated archived-at: Fri, 07 Jul 2017 03:11:43 -0000 This is an automated email from the ASF dual-hosted git repository. reddycharan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git The following commit(s) were added to refs/heads/master by this push: new 29eb420 BOOKKEEPER-1028 and BOOKKEEPER-1029 29eb420 is described below commit 29eb420f1cca07c1a3a7748ed7fbb8a5bc54f2fa Author: cguttapalem AuthorDate: Thu Jul 6 20:11:25 2017 -0700 BOOKKEEPER-1028 and BOOKKEEPER-1029 BOOKKEEPER-1028: inc/excl opts listunderreplicated - Introduce including and excluding BookieId options for listunderreplicatedLedgers - But there is limitation that, since replicaslist wont be updated in underreplicatedledger zNode there is possibility of stale information --------------------------------------------------------- BOOKKEEPER-1029: BookieDecommision Workflow - LostBookieRecoveryDelay config param is stored in ZK - if LostBookieRecoveryDelay is reset to same value then it force triggers audit immediately - Added logic to trigger immediately or schedule pending audittask depending on the changed value in ZK - good number of testcases validating focetrigger/reschedluing audittask - added bookieshell command to get/set LostBookieRecoveryDelay from ZK - added bookieshell command to triggeraudit by resetting LostBookieRecoveryDelay - added decommissionbookie bkshell command, which validates the complete replication of ledgers stored in the bookie Author: cguttapalem Author: Charan Reddy Guttapalem Reviewers: Enrico Olivelli , Sijie Guo This closes #127 from reddycharan/listunderreplicatedpredicate and squashes the following commits: 34bacf3c [cguttapalem] BOOKKEEPER-1029: BookieDecommision Workflow eb43ec49 [Charan Reddy Guttapalem] BOOKKEEPER-1029: BookieDecommision Workflow fcb399df [cguttapalem] BOOKKEEPER-1028: inc/excl opts listunderreplicated --- bookkeeper-server/conf/log4j.shell.properties | 1 + .../org/apache/bookkeeper/bookie/BookieShell.java | 176 +++++++++++- .../apache/bookkeeper/client/BookKeeperAdmin.java | 309 +++++++++++++++++++++ .../apache/bookkeeper/client/LedgerChecker.java | 5 + .../apache/bookkeeper/client/LedgerMetadata.java | 2 +- .../meta/LedgerUnderreplicationManager.java | 52 +++- .../meta/ZkLedgerUnderreplicationManager.java | 122 +++++++- .../org/apache/bookkeeper/replication/Auditor.java | 144 ++++++++-- .../replication/ReplicationException.java | 2 +- .../bookkeeper/util/BookKeeperConstants.java | 1 + .../bookkeeper/client/BookKeeperAdminTest.java | 230 +++++++++++++++ .../replication/AuditorLedgerCheckerTest.java | 267 +++++++++++++++++- 12 files changed, 1262 insertions(+), 49 deletions(-) diff --git a/bookkeeper-server/conf/log4j.shell.properties b/bookkeeper-server/conf/log4j.shell.properties index dcdc77c..58d6ea6 100644 --- a/bookkeeper-server/conf/log4j.shell.properties +++ b/bookkeeper-server/conf/log4j.shell.properties @@ -39,3 +39,4 @@ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n log4j.logger.org.apache.zookeeper=ERROR log4j.logger.org.apache.bookkeeper=ERROR log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO +log4j.logger.org.apache.bookkeeper.client.BookKeeperAdmin=INFO diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 3fc9d25..42d4de4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -44,15 +44,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; import org.apache.bookkeeper.bookie.Journal.JournalScanner; import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeperAdmin; +import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadata; @@ -94,7 +95,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractFuture; - /** * Bookie Shell is to provide utilities for users to administer a bookkeeper cluster. */ @@ -126,6 +126,9 @@ public class BookieShell implements Tool { static final String CMD_UPDATELEDGER = "updateledgers"; static final String CMD_DELETELEDGER = "deleteledger"; static final String CMD_BOOKIEINFO = "bookieinfo"; + static final String CMD_DECOMMISSIONBOOKIE = "decommissionbookie"; + static final String CMD_LOSTBOOKIERECOVERYDELAY = "lostbookierecoverydelay"; + static final String CMD_TRIGGERAUDIT = "triggeraudit"; static final String CMD_HELP = "help"; final ServerConfiguration bkConf = new ServerConfiguration(); @@ -498,6 +501,8 @@ public class BookieShell implements Tool { public ListUnderreplicatedCmd() { super(CMD_LISTUNDERREPLICATED); + opts.addOption("missingreplica", true, "Bookie Id of missing replica"); + opts.addOption("excludingmissingreplica", true, "Bookie Id of missing replica to ignore"); } @Override @@ -507,16 +512,30 @@ public class BookieShell implements Tool { @Override String getDescription() { - return "List ledgers marked as underreplicated"; + return "List ledgers marked as underreplicated, with optional options to specify missingreplica (BookieId) and to exclude missingreplica"; } @Override String getUsage() { - return "listunderreplicated"; + return "listunderreplicated [[-missingreplica ] [-excludingmissingreplica ]]"; } @Override int runCmd(CommandLine cmdLine) throws Exception { + + final String includingBookieId = cmdLine.getOptionValue("missingreplica"); + final String excludingBookieId = cmdLine.getOptionValue("excludingmissingreplica"); + + Predicate> predicate = null; + if (!StringUtils.isBlank(includingBookieId) && !StringUtils.isBlank(excludingBookieId)) { + predicate = replicasList -> (replicasList.contains(includingBookieId) + && !replicasList.contains(excludingBookieId)); + } else if (!StringUtils.isBlank(includingBookieId)) { + predicate = replicasList -> replicasList.contains(includingBookieId); + } else if (!StringUtils.isBlank(excludingBookieId)) { + predicate = replicasList -> !replicasList.contains(excludingBookieId); + } + ZooKeeper zk = null; try { zk = ZooKeeperClient.newBuilder() @@ -525,7 +544,7 @@ public class BookieShell implements Tool { .build(); LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk); LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager(); - Iterator iter = underreplicationManager.listLedgersToRereplicate(); + Iterator iter = underreplicationManager.listLedgersToRereplicate(predicate); while (iter.hasNext()) { System.out.println(iter.next()); } @@ -1327,6 +1346,65 @@ public class BookieShell implements Tool { } /** + * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in Zookeeper + */ + class LostBookieRecoveryDelayCmd extends MyCommand { + Options opts = new Options(); + + public LostBookieRecoveryDelayCmd() { + super(CMD_LOSTBOOKIERECOVERYDELAY); + opts.addOption("g", "get", false, "Get LostBookieRecoveryDelay value (in seconds)"); + opts.addOption("s", "set", true, "Set LostBookieRecoveryDelay value (in seconds)"); + } + + @Override + Options getOptions() { + return opts; + } + + @Override + String getDescription() { + return "Setter and Getter for LostBookieRecoveryDelay value (in seconds) in Zookeeper"; + } + + @Override + String getUsage() { + return "lostbookierecoverydelay [-get|-set ]"; + } + + @Override + int runCmd(CommandLine cmdLine) throws Exception { + boolean getter = cmdLine.hasOption("g"); + boolean setter = cmdLine.hasOption("s"); + + if ((!getter && !setter) || (getter && setter)) { + LOG.error("One and only one of -get and -set must be specified"); + printUsage(); + return 1; + } + ClientConfiguration adminConf = new ClientConfiguration(bkConf); + BookKeeperAdmin admin = new BookKeeperAdmin(adminConf); + try { + if (getter) { + int lostBookieRecoveryDelay = admin.getLostBookieRecoveryDelay(); + LOG.info("LostBookieRecoveryDelay value in ZK: {}", String.valueOf(lostBookieRecoveryDelay)); + } else { + int lostBookieRecoveryDelay = Integer.parseInt(cmdLine.getOptionValue("set")); + admin.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); + LOG.info("Successfully set LostBookieRecoveryDelay value in ZK: {}", + String.valueOf(lostBookieRecoveryDelay)); + } + } finally { + if (admin != null) { + admin.close(); + } + } + return 0; + } + } + + + /** * Print which node has the auditor lock */ class WhoIsAuditorCmd extends MyCommand { @@ -1825,6 +1903,91 @@ public class BookieShell implements Tool { } /** + * Command to trigger AuditTask by resetting lostBookieRecoveryDelay to its current value + */ + class TriggerAuditCmd extends MyCommand { + Options opts = new Options(); + + TriggerAuditCmd() { + super(CMD_TRIGGERAUDIT); + } + + @Override + String getDescription() { + return "Force trigger the Audit by resetting the lostBookieRecoveryDelay"; + } + + @Override + String getUsage() { + return CMD_TRIGGERAUDIT; + } + + @Override + Options getOptions() { + return opts; + } + + @Override + public int runCmd(CommandLine cmdLine) throws Exception { + ClientConfiguration adminConf = new ClientConfiguration(bkConf); + BookKeeperAdmin admin = new BookKeeperAdmin(adminConf); + try { + admin.triggerAudit(); + } finally { + if (admin != null) { + admin.close(); + } + } + return 0; + } + } + + /** + * Command to trigger AuditTask by resetting lostBookieRecoveryDelay and then make sure the + * ledgers stored in the bookie are properly replicated. + */ + class DecommissionBookieCmd extends MyCommand { + Options lOpts = new Options(); + + DecommissionBookieCmd() { + super(CMD_DECOMMISSIONBOOKIE); + } + + @Override + String getDescription() { + return "Force trigger the Audittask and make sure all the ledgers stored in the decommissioning bookie are replicated"; + } + + @Override + String getUsage() { + return CMD_DECOMMISSIONBOOKIE; + } + + @Override + Options getOptions() { + return lOpts; + } + + @Override + public int runCmd(CommandLine cmdLine) throws Exception { + ClientConfiguration adminConf = new ClientConfiguration(bkConf); + BookKeeperAdmin admin = new BookKeeperAdmin(adminConf); + try { + BookieSocketAddress thisBookieAddress = Bookie.getBookieAddress(bkConf); + admin.decommissionBookie(thisBookieAddress); + return 0; + } catch (Exception e) { + LOG.error("Received exception in DecommissionBookieCmd ", e); + return -1; + } finally { + if (admin != null) { + admin.close(); + } + } + } + } + + /** * A facility for reporting update ledger progress. */ public interface UpdateLedgerNotifier { @@ -1855,7 +2018,10 @@ public class BookieShell implements Tool { commands.put(CMD_UPDATELEDGER, new UpdateLedgerCmd()); commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd()); commands.put(CMD_BOOKIEINFO, new BookieInfoCmd()); + commands.put(CMD_DECOMMISSIONBOOKIE, new DecommissionBookieCmd()); commands.put(CMD_HELP, new HelpCmd()); + commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new LostBookieRecoveryDelayCmd()); + commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd()); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 16d39ce..8cf8833 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -35,19 +35,35 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; +import java.util.Set; import java.util.UUID; +import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback; import org.apache.bookkeeper.client.BookKeeper.SyncOpenCallback; import org.apache.bookkeeper.client.LedgerFragmentReplicator.SingleFragmentCallback; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; +import org.apache.bookkeeper.replication.AuditorElector; +import org.apache.bookkeeper.replication.BookieLedgerIndexer; +import org.apache.bookkeeper.replication.ReplicationException.BKAuditException; +import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; +import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; @@ -64,6 +80,8 @@ import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.AbstractFuture; + /** * Admin client for BookKeeper clusters */ @@ -89,6 +107,14 @@ public class BookKeeperAdmin implements AutoCloseable { */ private Random rand = new Random(); + private LedgerManagerFactory mFactory; + + /* + * underreplicationManager is not initialized as part of constructor use its + * getter (getUnderreplicationManager) so that it can be lazy-initialized + */ + private LedgerUnderreplicationManager underreplicationManager; + /** * Constructor that takes in a ZooKeeper servers connect string so we know * how to connect to ZooKeeper to retrieve information about the BookKeeper @@ -144,6 +170,7 @@ public class BookKeeperAdmin implements AutoCloseable { bkc = new BookKeeper(conf, zk); ownsBK = true; this.lfr = new LedgerFragmentReplicator(bkc, NullStatsLogger.INSTANCE); + this.mFactory = bkc.ledgerManagerFactory; } /** @@ -162,6 +189,7 @@ public class BookKeeperAdmin implements AutoCloseable { ownsZK = false; this.bookiesPath = bkc.getConf().getZkAvailableBookiesPath(); this.lfr = new LedgerFragmentReplicator(bkc, statsLogger); + this.mFactory = bkc.ledgerManagerFactory; } public BookKeeperAdmin(final BookKeeper bkc) { @@ -1064,4 +1092,285 @@ public class BookKeeperAdmin implements AutoCloseable { public LedgerMetadata getLedgerMetadata(LedgerHandle lh) { return lh.getLedgerMetadata(); } + + private LedgerUnderreplicationManager getUnderreplicationManager() + throws CompatibilityException, KeeperException, InterruptedException { + if (underreplicationManager == null) { + underreplicationManager = mFactory.newLedgerUnderreplicationManager(); + } + return underreplicationManager; + } + + /** + * Setter for LostBookieRecoveryDelay value (in seconds) in Zookeeper + * + * @param lostBookieRecoveryDelay + * lostBookieRecoveryDelay value (in seconds) to set + * @throws CompatibilityException + * @throws KeeperException + * @throws InterruptedException + * @throws UnavailableException + */ + public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) + throws CompatibilityException, KeeperException, InterruptedException, UnavailableException { + LedgerUnderreplicationManager urlManager = getUnderreplicationManager(); + urlManager.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); + } + + /** + * returns the current LostBookieRecoveryDelay value (in seconds) in Zookeeper + * + * @return + * current lostBookieRecoveryDelay value (in seconds) + * @throws CompatibilityException + * @throws KeeperException + * @throws InterruptedException + * @throws UnavailableException + */ + public int getLostBookieRecoveryDelay() + throws CompatibilityException, KeeperException, InterruptedException, UnavailableException { + LedgerUnderreplicationManager urlManager = getUnderreplicationManager(); + return urlManager.getLostBookieRecoveryDelay(); + } + + /** + * trigger AuditTask by resetting lostBookieRecoveryDelay to its current + * value. If Autorecovery is not enabled or if there is no Auditor then this + * method will throw UnavailableException. + * + * @throws CompatibilityException + * @throws KeeperException + * @throws InterruptedException + * @throws UnavailableException + * @throws IOException + */ + public void triggerAudit() + throws CompatibilityException, KeeperException, InterruptedException, UnavailableException, IOException { + LedgerUnderreplicationManager urlManager = getUnderreplicationManager(); + if (!urlManager.isLedgerReplicationEnabled()) { + LOG.error("Autorecovery is disabled. So giving up!"); + throw new UnavailableException("Autorecovery is disabled. So giving up!"); + } + + BookieSocketAddress auditorId = AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.conf), zk); + if (auditorId == null) { + LOG.error("No auditor elected, though Autorecovery is enabled. So giving up."); + throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up."); + } + + int previousLostBookieRecoveryDelayValue = urlManager.getLostBookieRecoveryDelay(); + LOG.info("Resetting LostBookieRecoveryDelay value: {}, to kickstart audit task", + previousLostBookieRecoveryDelayValue); + urlManager.setLostBookieRecoveryDelay(previousLostBookieRecoveryDelayValue); + } + + /** + * Triggers AuditTask by resetting lostBookieRecoveryDelay and then make + * sure the ledgers stored in the given decommissioning bookie are properly + * replicated and they are not underreplicated because of the given bookie. + * This method waits untill there are no underreplicatedledgers because of this + * bookie. If the given Bookie is not shutdown yet, then it will throw + * BKIllegalOpException. + * + * @param bookieAddress + * address of the decommissioning bookie + * @throws CompatibilityException + * @throws UnavailableException + * @throws KeeperException + * @throws InterruptedException + * @throws IOException + * @throws BKAuditException + * @throws TimeoutException + * @throws BKException + */ + public void decommissionBookie(BookieSocketAddress bookieAddress) + throws CompatibilityException, UnavailableException, KeeperException, InterruptedException, IOException, + BKAuditException, TimeoutException, BKException { + if (getAvailableBookies().contains(bookieAddress) || getReadOnlyBookies().contains(bookieAddress)) { + LOG.error("Bookie: {} is not shutdown yet", bookieAddress); + throw BKException.create(BKException.Code.IllegalOpException); + } + + triggerAudit(); + + /* + * Sleep for 30 secs, so that Auditor gets chance to trigger its + * force audittask and let the underreplicationmanager process + * to do its replication process + */ + Thread.sleep(30 * 1000); + + /* + * get the collection of the ledgers which are stored in this + * bookie, by making a call to + * bookieLedgerIndexer.getBookieToLedgerIndex. + */ + + BookieLedgerIndexer bookieLedgerIndexer = new BookieLedgerIndexer(bkc.ledgerManager); + Map> bookieToLedgersMap = bookieLedgerIndexer.getBookieToLedgerIndex(); + Set ledgersStoredInThisBookie = bookieToLedgersMap.get(bookieAddress.toString()); + if ((ledgersStoredInThisBookie != null) && (!ledgersStoredInThisBookie.isEmpty())) { + /* + * wait untill all the ledgers are replicated to other + * bookies by making sure that these ledgers metadata don't + * contain this bookie as part of their ensemble. + */ + waitForLedgersToBeReplicated(ledgersStoredInThisBookie, bookieAddress, bkc.ledgerManager); + } + + // for double-checking, check if any ledgers are listed as underreplicated because of this bookie + Predicate> predicate = replicasList -> replicasList.contains(bookieAddress.toString()); + Iterator urLedgerIterator = underreplicationManager.listLedgersToRereplicate(predicate); + if (urLedgerIterator.hasNext()) { + //if there are any then wait and make sure those ledgers are replicated properly + LOG.info("Still in some underreplicated ledgers metadata, this bookie is part of its ensemble. " + + "Have to make sure that those ledger fragments are rereplicated"); + List urLedgers = new ArrayList<>(); + urLedgerIterator.forEachRemaining(urLedgers::add); + waitForLedgersToBeReplicated(urLedgers, bookieAddress, bkc.ledgerManager); + } + } + + private void waitForLedgersToBeReplicated(Collection ledgers, BookieSocketAddress thisBookieAddress, + LedgerManager ledgerManager) throws InterruptedException, TimeoutException { + int maxSleepTimeInBetweenChecks = 10 * 60 * 1000; // 10 minutes + int sleepTimePerLedger = 10 * 1000; // 10 secs + Predicate validateBookieIsNotPartOfEnsemble = ledgerId -> !areEntriesOfLedgerStoredInTheBookie(ledgerId, + thisBookieAddress, ledgerManager); + while (!ledgers.isEmpty()) { + LOG.info("Count of Ledgers which need to be rereplicated: {}", ledgers.size()); + int sleepTimeForThisCheck = ledgers.size() * sleepTimePerLedger > maxSleepTimeInBetweenChecks + ? maxSleepTimeInBetweenChecks : ledgers.size() * sleepTimePerLedger; + Thread.sleep(sleepTimeForThisCheck); + LOG.debug("Making sure following ledgers replication to be completed: {}", ledgers); + ledgers.removeIf(validateBookieIsNotPartOfEnsemble); + } + } + + private boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress, + LedgerManager ledgerManager) { + ReadMetadataCallback cb = new ReadMetadataCallback(ledgerId); + ledgerManager.readLedgerMetadata(ledgerId, cb); + try { + LedgerMetadata ledgerMetadata = cb.get(); + Collection> ensemblesOfSegments = ledgerMetadata.getEnsembles().values(); + Iterator> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator(); + ArrayList ensemble; + int segmentNo = 0; + while (ensemblesOfSegmentsIterator.hasNext()) { + ensemble = ensemblesOfSegmentsIterator.next(); + if (ensemble.contains(bookieAddress)) { + if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata, bookieAddress, segmentNo++)) { + return true; + } + } + } + return false; + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() != null + && e.getCause().getClass().equals(BKException.BKNoSuchLedgerExistsException.class)) { + LOG.debug("Ledger: {} has been deleted", ledgerId); + return false; + } else { + LOG.error("Got exception while trying to read LedgerMeatadata of " + ledgerId, e); + throw new RuntimeException(e); + } + } + } + + private boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata ledgerMetadata, + BookieSocketAddress bookieAddress, int segmentNo) { + boolean isLedgerClosed = ledgerMetadata.isClosed(); + int ensembleSize = ledgerMetadata.getEnsembleSize(); + int writeQuorumSize = ledgerMetadata.getWriteQuorumSize(); + + List>> segments = new LinkedList>>( + ledgerMetadata.getEnsembles().entrySet()); + + boolean lastSegment = (segmentNo == (segments.size() - 1)); + + /* + * Checking the last segment of the ledger can be complicated in + * some cases. In the case that the ledger is closed, we can just + * check the fragments of the segment as normal, except in the case + * that no entry was ever written, to the ledger, in which case we + * check no fragments. + * + * Following the same approach as in LedgerChecker.checkLedger + */ + if (lastSegment && isLedgerClosed && (ledgerMetadata.getLastEntryId() < segments.get(segmentNo).getKey())) { + return false; + } + + /* + * if ensembleSize is equal to writeQuorumSize, then ofcourse all + * the entries of this segment are supposed to be stored in this + * bookie. If this is last segment of the ledger and if the ledger + * is not closed (this is a corner case), then we have to return + * true. For more info. Check BOOKKEEPER-237 and BOOKKEEPER-325. + */ + if ((lastSegment && !isLedgerClosed) || (ensembleSize == writeQuorumSize)) { + return true; + } + + /* + * the following check is required because ensembleSize can be + * greater than writeQuorumSize and in this case if there are only + * couple of entries then based on RoundRobinDistributionSchedule + * there might not be any entry copy in this bookie though this + * bookie is part of the ensemble of this segment. If no entry is + * stored in this bookie then we should return false, because + * ReplicationWorker wont take care of fixing the ledgerMetadata of + * this segment in this case. + * + * if ensembleSize > writeQuorumSize, then in LedgerFragment.java + * firstEntryID may not be equal to firstStoredEntryId lastEntryId + * may not be equalto lastStoredEntryId. firstStoredEntryId and + * lastStoredEntryId will be LedgerHandle.INVALID_ENTRY_ID, if no + * entry of this segment stored in this bookie. In this case + * LedgerChecker.verifyLedgerFragment will not consider it as + * unavailable/bad fragment though this bookie is part of the + * ensemble of the segment and it is down. + */ + DistributionSchedule distributionSchedule = new RoundRobinDistributionSchedule( + ledgerMetadata.getWriteQuorumSize(), ledgerMetadata.getAckQuorumSize(), + ledgerMetadata.getEnsembleSize()); + ArrayList currentSegmentEnsemble = segments.get(segmentNo).getValue(); + int thisBookieIndexInCurrentEnsemble = currentSegmentEnsemble.indexOf(bookieAddress); + long firstEntryId = segments.get(segmentNo).getKey(); + long lastEntryId = lastSegment ? ledgerMetadata.getLastEntryId() : segments.get(segmentNo + 1).getKey() - 1; + long firstStoredEntryId = LedgerHandle.INVALID_ENTRY_ID; + long firstEntryIter = firstEntryId; + // following the same approach followed in LedgerFragment.getFirstStoredEntryId() + for (int i = 0; i < ensembleSize && firstEntryIter <= lastEntryId; i++) { + if (distributionSchedule.hasEntry(firstEntryIter, thisBookieIndexInCurrentEnsemble)) { + firstStoredEntryId = firstEntryIter; + break; + } else { + firstEntryIter++; + } + } + return firstStoredEntryId != LedgerHandle.INVALID_ENTRY_ID; + } + + static class ReadMetadataCallback extends AbstractFuture + implements GenericCallback { + final long ledgerId; + + ReadMetadataCallback(long ledgerId) { + this.ledgerId = ledgerId; + } + + long getLedgerId() { + return ledgerId; + } + + public void operationComplete(int rc, LedgerMetadata result) { + if (rc != 0) { + setException(BKException.create(rc)); + } else { + set(result); + } + } + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java index 4266c90..7f58131 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java @@ -88,6 +88,11 @@ public class LedgerChecker { long firstStored = fragment.getFirstStoredEntryId(); long lastStored = fragment.getLastStoredEntryId(); + // because of this if block, even if the bookie of the fragment is + // down, it considers Fragment is available/not-bad if firstStored + // and lastStored are LedgerHandle.INVALID_ENTRY_ID. + // So same logic is used in BookieShell.DecommissionBookieCmd.areEntriesOfSegmentStoredInTheBookie + // if any change is made here, then the changes should be in BookieShell also if (firstStored == LedgerHandle.INVALID_ENTRY_ID) { if (lastStored != LedgerHandle.INVALID_ENTRY_ID) { throw new InvalidFragmentException(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java index 2641de9..92fc63e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java @@ -241,7 +241,7 @@ public class LedgerMetadata { state = LedgerMetadataFormat.State.CLOSED; } - void addEnsemble(long startEntryId, ArrayList ensemble) { + public void addEnsemble(long startEntryId, ArrayList ensemble) { assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey(); ensembles.put(startEntryId, ensemble); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java index d006895..014c1a8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java @@ -21,6 +21,8 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.replication.ReplicationException; import java.util.Iterator; +import java.util.List; +import java.util.function.Predicate; /** * Interface for marking ledgers which need to be rereplicated @@ -42,11 +44,18 @@ public interface LedgerUnderreplicationManager { /** * Get a list of all the ledgers which have been - * marked for rereplication. - * + * marked for rereplication, filtered by the predicate on the missing replicas list. + * + * Missing replicas list of an underreplicated ledger is the list of the bookies which are part of + * the ensemble of this ledger and are currently unavailable/down. + * + * If filtering is not needed then it is suggested to pass null for predicate, + * otherwise it will read the content of the ZNode to decide on filtering. + * + * @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required * @return an iterator which returns ledger ids */ - Iterator listLedgersToRereplicate(); + Iterator listLedgersToRereplicate(Predicate> predicate); /** * Acquire a underreplicated ledger for rereplication. The ledger @@ -116,4 +125,41 @@ public interface LedgerUnderreplicationManager { */ void notifyLedgerReplicationEnabled(GenericCallback cb) throws ReplicationException.UnavailableException; + + /** + * Creates the zNode for lostBookieRecoveryDelay with the specified value and returns true. + * If the node is already existing, then it returns false + * + * @param lostBookieRecoveryDelay + * @return + * true if it succeeds in creating zNode for lostBookieRecoveryDelay, false if it is already existing + * @throws ReplicationException.UnavailableException + */ + boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) + throws ReplicationException.UnavailableException; + + /** + * Setter for the lostBookieRecoveryDelay znode + * + * @param lostBookieRecoveryDelay + * @throws ReplicationException.UnavailableException + */ + void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws ReplicationException.UnavailableException; + + /** + * Getter for the lostBookieRecoveryDelay + * + * @return the int value of lostBookieRecoveryDelay + * @throws ReplicationException.UnavailableException + */ + int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableException; + + /** + * Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed + * + * @param cb + * @throws ReplicationException.UnavailableException + */ + void notifyLostBookieRecoveryDelayChanged(GenericCallback cb) + throws ReplicationException.UnavailableException; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index e9d48f3..e56ee30 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -49,7 +50,9 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +60,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.protobuf.TextFormat; -import org.apache.zookeeper.data.ACL; /** * ZooKeeper implementation of underreplication manager. @@ -101,6 +103,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa private final String urLockPath; private final String layoutZNode; private final AbstractConfiguration conf; + private final String lostBookieRecoveryDelayZnode; private final ZooKeeper zkc; private final SubTreeCache subTreeCache; @@ -112,7 +115,8 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa urLedgerPath = basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH; urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK; - + lostBookieRecoveryDelayZnode = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE; + idExtractionPattern = Pattern.compile("urL(\\d+)$"); this.zkc = zkc; this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() { @@ -341,8 +345,21 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa } } + /** + * Get a list of all the ledgers which have been + * marked for rereplication, filtered by the predicate on the replicas list. + * + * Replicas list of an underreplicated ledger is the list of the bookies which are part of + * the ensemble of this ledger and are currently unavailable/down. + * + * If filtering is not needed then it is suggested to pass null for predicate, + * otherwise it will read the content of the ZNode to decide on filtering. + * + * @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required. + * @return an iterator which returns ledger ids + */ @Override - public Iterator listLedgersToRereplicate() { + public Iterator listLedgersToRereplicate(final Predicate> predicate) { final Queue queue = new LinkedList(); queue.add(urLedgerPath); @@ -363,12 +380,20 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa while (queue.size() > 0 && curBatch.size() == 0) { String parent = queue.remove(); try { - for (String c : zkc.getChildren(parent,false)) { - String child = parent + "/" + c; - if (c.startsWith("urL")) { - curBatch.add(getLedgerId(child)); - } else { - queue.add(child); + for (String c : zkc.getChildren(parent, false)) { + try { + String child = parent + "/" + c; + if (c.startsWith("urL")) { + long ledgerId = getLedgerId(child); + if ((predicate == null) + || predicate.test(getLedgerUnreplicationInfo(ledgerId).getReplicaList())) { + curBatch.add(ledgerId); + } + } else { + queue.add(child); + } + } catch (KeeperException.NoNodeException nne) { + // ignore } } } catch (InterruptedException ie) { @@ -694,4 +719,83 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1); } } + + @Override + public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws UnavailableException { + LOG.debug("initializeLostBookieRecoveryDelay()"); + try { + zkc.create(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException ke) { + LOG.info( + "lostBookieRecoveryDelay Znode is already present, so using existing lostBookieRecoveryDelay Znode value"); + return false; + } catch (KeeperException ke) { + LOG.error("Error while initializing LostBookieRecoveryDelay", ke); + throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); + } + return true; + } + + @Override + public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws UnavailableException { + LOG.debug("setLostBookieRecoveryDelay()"); + try { + if (zkc.exists(lostBookieRecoveryDelayZnode, false) != null) { + zkc.setData(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8), + -1); + } else { + zkc.create(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (KeeperException ke) { + LOG.error("Error while setting LostBookieRecoveryDelay ", ke); + throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); + } + } + + @Override + public int getLostBookieRecoveryDelay() throws UnavailableException { + LOG.debug("getLostBookieRecoveryDelay()"); + try { + byte[] data = zkc.getData(lostBookieRecoveryDelayZnode, false, null); + return Integer.parseInt(new String(data, UTF_8)); + } catch (KeeperException ke) { + LOG.error("Error while getting LostBookieRecoveryDelay ", ke); + throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); + } + } + + @Override + public void notifyLostBookieRecoveryDelayChanged(GenericCallback cb) throws UnavailableException { + LOG.debug("notifyLostBookieRecoveryDelayChanged()"); + Watcher w = new Watcher() { + public void process(WatchedEvent e) { + if (e.getType() == Watcher.Event.EventType.NodeDataChanged) { + cb.operationComplete(0, null); + } + } + }; + try { + if (null == zkc.exists(lostBookieRecoveryDelayZnode, w)) { + cb.operationComplete(0, null); + return; + } + } catch (KeeperException ke) { + LOG.error("Error while checking the state of lostBookieRecoveryDelay", ke); + throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); + } + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 3df941f..054e09d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -20,8 +20,19 @@ */ package org.apache.bookkeeper.replication; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -41,35 +52,21 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.replication.ReplicationException.BKAuditException; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; -import org.apache.bookkeeper.replication.ReplicationStats; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.commons.collections.CollectionUtils; import org.apache.zookeeper.AsyncCallback; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.SettableFuture; - import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Future; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.SettableFuture; /** * Auditor is a single entity in the entire Bookie cluster and will be watching @@ -101,7 +98,8 @@ public class Auditor implements BookiesListener { private final Counter numDelayedBookieAuditsCancelled; private volatile Future auditTask; private Set bookiesToBeAudited = Sets.newHashSet(); - + private volatile int lostBookieRecoveryDelayBeforeChange; + public Auditor(final String bookieIdentifier, ServerConfiguration conf, ZooKeeper zkc, StatsLogger statsLogger) throws UnavailableException { this.conf = conf; @@ -149,6 +147,15 @@ public class Auditor implements BookiesListener { LOG.info("AuthProvider used by the Auditor is "+clientConfiguration.getClientAuthProviderFactoryClass()); this.bkc = new BookKeeper(clientConfiguration, zkc); this.admin = new BookKeeperAdmin(bkc, statsLogger); + if (this.ledgerUnderreplicationManager + .initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) { + LOG.info("Initializing lostBookieRecoveryDelay zNode to the conif value: {}", + conf.getLostBookieRecoveryDelay()); + } else { + LOG.info( + "Valid lostBookieRecoveryDelay zNode is available, so not creating lostBookieRecoveryDelay zNode as part of Auditor initialization "); + } + lostBookieRecoveryDelayBeforeChange = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay(); } catch (CompatibilityException ce) { throw new UnavailableException( "CompatibilityException while initializing Auditor", ce); @@ -191,7 +198,8 @@ public class Auditor implements BookiesListener { public void run() { try { waitIfLedgerReplicationDisabled(); - + int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager + .getLostBookieRecoveryDelay(); List availableBookies = getAvailableBookies(); // casting to String, as knownBookies and availableBookies @@ -220,7 +228,7 @@ public class Auditor implements BookiesListener { } knownBookies.removeAll(bookiesToBeAudited); - if (conf.getLostBookieRecoveryDelay() == 0) { + if (lostBookieRecoveryDelay == 0) { startAudit(false); bookiesToBeAudited.clear(); return; @@ -246,9 +254,9 @@ public class Auditor implements BookiesListener { auditTask = null; bookiesToBeAudited.clear(); } - }, conf.getLostBookieRecoveryDelay(), TimeUnit.SECONDS); + }, lostBookieRecoveryDelay, TimeUnit.SECONDS); numBookieAuditsDelayed.inc(); - LOG.info("Delaying bookie audit by " + conf.getLostBookieRecoveryDelay() + LOG.info("Delaying bookie audit by " + lostBookieRecoveryDelay + "secs for " + bookiesToBeAudited.toString()); } } catch (BKException bke) { @@ -263,6 +271,64 @@ public class Auditor implements BookiesListener { }); } + synchronized Future submitLostBookieRecoveryDelayChangedEvent() { + if (executor.isShutdown()) { + SettableFuture f = SettableFuture. create(); + f.setException(new BKAuditException("Auditor shutting down")); + return f; + } + return executor.submit(new Runnable() { + int lostBookieRecoveryDelay = -1; + public void run() { + try { + waitIfLedgerReplicationDisabled(); + lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager + .getLostBookieRecoveryDelay(); + // if there is pending auditTask, cancel the task. So that it can be rescheduled + // after new lostBookieRecoveryDelay period + if (auditTask != null) { + LOG.info("lostBookieRecoveryDelay period has been changed so canceling the pending AuditTask"); + auditTask.cancel(false); + numDelayedBookieAuditsCancelled.inc(); + } + + // if lostBookieRecoveryDelay is set to its previous value then consider it as + // signal to trigger the Audit immediately. + if ((lostBookieRecoveryDelay == 0) + || (lostBookieRecoveryDelay == lostBookieRecoveryDelayBeforeChange)) { + LOG.info( + "lostBookieRecoveryDelay has been set to 0 or reset to its previos value, so starting AuditTask. " + + "Current lostBookieRecoveryDelay: {}, previous lostBookieRecoveryDelay: {}", + lostBookieRecoveryDelay, lostBookieRecoveryDelayBeforeChange); + startAudit(false); + auditTask = null; + bookiesToBeAudited.clear(); + } else if (auditTask != null) { + LOG.info("lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly", + lostBookieRecoveryDelay); + auditTask = executor.schedule(new Runnable() { + public void run() { + startAudit(false); + auditTask = null; + bookiesToBeAudited.clear(); + } + }, lostBookieRecoveryDelay, TimeUnit.SECONDS); + numBookieAuditsDelayed.inc(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while for LedgersReplication to be enabled ", ie); + } catch (UnavailableException ue) { + LOG.error("Exception while reading from ZK", ue); + } finally{ + if (lostBookieRecoveryDelay != -1) { + lostBookieRecoveryDelayBeforeChange = lostBookieRecoveryDelay; + } + } + } + }); + } + public void start() { LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier); // on startup watching available bookie and based on the @@ -317,6 +383,14 @@ public class Auditor implements BookiesListener { LOG.error("Couldn't get bookie list, exiting", bke); submitShutdownTask(); } + + try { + this.ledgerUnderreplicationManager + .notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb()); + } catch (UnavailableException ue) { + LOG.error("Exception while registering for LostBookieRecoveryDelay change notification", ue); + submitShutdownTask(); + } long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval(); if (bookieCheckInterval == 0) { @@ -330,6 +404,19 @@ public class Auditor implements BookiesListener { } } + private class LostBookieRecoveryDelayChangedCb implements GenericCallback { + @Override + public void operationComplete(int rc, Void result) { + try { + Auditor.this.ledgerUnderreplicationManager + .notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this); + } catch (UnavailableException ae) { + LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", ae); + } + Auditor.this.submitLostBookieRecoveryDelayChangedEvent(); + } + } + private void waitIfLedgerReplicationDisabled() throws UnavailableException, InterruptedException { ReplicationEnableCb cb = new ReplicationEnableCb(); @@ -673,4 +760,11 @@ public class Auditor implements BookiesListener { } }; + int getLostBookieRecoveryDelayBeforeChange() { + return lostBookieRecoveryDelayBeforeChange; + } + + Future getAuditTask() { + return auditTask; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java index 32a03e2..2ff7b94 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java @@ -64,7 +64,7 @@ public abstract class ReplicationException extends Exception { /** * Exception while auditing bookie-ledgers */ - static class BKAuditException extends ReplicationException { + public static class BKAuditException extends ReplicationException { private static final long serialVersionUID = 95551905L; BKAuditException(String message, Throwable cause) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java index bae7715..362e1e6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java @@ -40,6 +40,7 @@ public class BookKeeperConstants { public static final String UNDER_REPLICATION_NODE = "underreplication"; public static final String UNDER_REPLICATION_LOCK = "locks"; public static final String DISABLE_NODE = "disable"; + public static final String LOSTBOOKIERECOVERYDELAY_NODE = "lostBookieRecoveryDelay"; public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers"; public static final String LAYOUT_ZNODE = "LAYOUT"; public static final String INSTANCEID = "INSTANCEID"; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java new file mode 100644 index 0000000..b197353 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java @@ -0,0 +1,230 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Iterator; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.client.BKException.BKIllegalOpException; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; +import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BookKeeperAdminTest extends BookKeeperClusterTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(BookKeeperAdminTest.class); + private DigestType digestType = DigestType.CRC32; + private static final String PASSWORD = "testPasswd"; + private static final int numOfBookies = 6; + private final int lostBookieRecoveryDelayInitValue = 1800; + + public BookKeeperAdminTest() { + super(numOfBookies); + baseConf.setAutoRecoveryDaemonEnabled(true); + baseConf.setLostBookieRecoveryDelay(lostBookieRecoveryDelayInitValue); + baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30000)); + } + + @Test(timeout = 60000) + public void testLostBookieRecoveryDelayValue() throws Exception { + BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString()); + assertEquals("LostBookieRecoveryDelay", lostBookieRecoveryDelayInitValue, bkAdmin.getLostBookieRecoveryDelay()); + int newLostBookieRecoveryDelayValue = 2400; + bkAdmin.setLostBookieRecoveryDelay(newLostBookieRecoveryDelayValue); + assertEquals("LostBookieRecoveryDelay", newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay()); + assertEquals("LostBookieRecoveryDelay", newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay()); + newLostBookieRecoveryDelayValue = 3000; + bkAdmin.setLostBookieRecoveryDelay(newLostBookieRecoveryDelayValue); + assertEquals("LostBookieRecoveryDelay", newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay()); + bkAdmin.close(); + } + + @Test(timeout = 60000) + public void testTriggerAudit() throws Exception { + ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc); + BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString()); + int lostBookieRecoveryDelayValue = bkAdmin.getLostBookieRecoveryDelay(); + urLedgerMgr.disableLedgerReplication(); + try { + bkAdmin.triggerAudit(); + Assert.fail("Trigger Audit should have failed because LedgerReplication is disabled"); + } catch (UnavailableException une) { + // expected + } + assertEquals("LostBookieRecoveryDelay", lostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay()); + urLedgerMgr.enableLedgerReplication(); + bkAdmin.triggerAudit(); + assertEquals("LostBookieRecoveryDelay", lostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay()); + long ledgerId = 1L; + LedgerHandle ledgerHandle = bkc.createLedgerAdv(ledgerId, numBookies, numBookies, numBookies, digestType, + PASSWORD.getBytes(), null); + ledgerHandle.addEntry(0, "data".getBytes()); + ledgerHandle.close(); + + killBookie(1); + /* + * since lostBookieRecoveryDelay is set, when a bookie is died, it will + * not start Audit process immediately. But when triggerAudit is called + * it will force audit process. + */ + bkAdmin.triggerAudit(); + Thread.sleep(500); + Iterator ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null); + assertTrue("There are supposed to be underreplicatedledgers", ledgersToRereplicate.hasNext()); + assertEquals("Underreplicated ledgerId", ledgerId, ledgersToRereplicate.next().longValue()); + bkAdmin.close(); + } + + @Test(timeout = 480000) + public void testDecommissionBookie() throws Exception { + ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc); + BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString()); + + int numOfLedgers = 2 * numOfBookies; + int numOfEntries = 2 * numOfBookies; + for (int i = 0; i < numOfLedgers; i++) { + LedgerHandle lh = bkc.createLedger(3, 2, digestType, PASSWORD.getBytes()); + for (int j = 0; j < numOfEntries; j++) { + lh.addEntry("entry".getBytes()); + } + lh.close(); + } + /* + * create ledgers having empty segments (segment with no entries) + */ + for (int i = 0; i < numOfLedgers; i++) { + LedgerHandle emptylh = bkc.createLedger(3, 2, digestType, PASSWORD.getBytes()); + emptylh.close(); + } + + try { + /* + * if we try to call decommissionBookie for a bookie which is not + * shutdown, then it should throw BKIllegalOpException + */ + bkAdmin.decommissionBookie(bs.get(0).getLocalAddress()); + fail("Expected BKIllegalOpException because that bookie is not shutdown yet"); + } catch (BKIllegalOpException bkioexc) { + // expected IllegalException + } + + ServerConfiguration killedBookieConf = killBookie(1); + /* + * this decommisionBookie should make sure that there are no + * underreplicated ledgers because of this bookie + */ + bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf)); + bkAdmin.triggerAudit(); + Thread.sleep(500); + Iterator ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null); + if (ledgersToRereplicate.hasNext()) { + while (ledgersToRereplicate.hasNext()) { + Long ledgerId = ledgersToRereplicate.next(); + LOG.error("Ledger: {} is underreplicated which is not expected", ledgerId); + } + fail("There are not supposed to be any underreplicatedledgers"); + } + + killedBookieConf = killBookie(0); + bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf)); + bkAdmin.triggerAudit(); + Thread.sleep(500); + ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null); + if (ledgersToRereplicate.hasNext()) { + while (ledgersToRereplicate.hasNext()) { + Long ledgerId = ledgersToRereplicate.next(); + LOG.error("Ledger: {} is underreplicated which is not expected", ledgerId); + } + fail("There are not supposed to be any underreplicatedledgers"); + } + bkAdmin.close(); + } + + @Test(timeout = 240000) + public void testDecommissionForLedgersWithMultipleSegmentsAndNotWriteClosed() throws Exception { + ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc); + BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString()); + int numOfEntries = 2 * numOfBookies; + + LedgerHandle lh1 = bkc.createLedgerAdv(1L, numBookies, 3, 3, digestType, PASSWORD.getBytes(), null); + LedgerHandle lh2 = bkc.createLedgerAdv(2L, numBookies, 3, 3, digestType, PASSWORD.getBytes(), null); + LedgerHandle lh3 = bkc.createLedgerAdv(3L, numBookies, 3, 3, digestType, PASSWORD.getBytes(), null); + LedgerHandle lh4 = bkc.createLedgerAdv(4L, numBookies, 3, 3, digestType, PASSWORD.getBytes(), null); + for (int j = 0; j < numOfEntries; j++) { + lh1.addEntry(j, "data".getBytes()); + lh2.addEntry(j, "data".getBytes()); + lh3.addEntry(j, "data".getBytes()); + lh4.addEntry(j, "data".getBytes()); + } + + startNewBookie(); + + assertEquals("Number of Available Bookies", numOfBookies + 1, bkAdmin.getAvailableBookies().size()); + + ServerConfiguration killedBookieConf = killBookie(0); + + /* + * since one of the bookie is killed, ensemble change happens when next + * write is made.So new segment will be created for those 2 ledgers. + */ + for (int j = numOfEntries; j < 2 * numOfEntries; j++) { + lh1.addEntry(j, "data".getBytes()); + lh2.addEntry(j, "data".getBytes()); + } + + /* + * Here lh1 and lh2 have multiple segments and are writeclosed. But lh3 and lh4 are + * not writeclosed and contains only one segment. + */ + lh1.close(); + lh2.close(); + + /* + * If the last segment of the ledger is underreplicated and if the + * ledger is not closed then it will remain underreplicated for + * openLedgerRereplicationGracePeriod (by default 30 secs). For more + * info. Check BOOKKEEPER-237 and BOOKKEEPER-325. But later + * ReplicationWorker will fence the ledger. + */ + bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf)); + bkAdmin.triggerAudit(); + Thread.sleep(500); + Iterator ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null); + if (ledgersToRereplicate.hasNext()) { + while (ledgersToRereplicate.hasNext()) { + Long ledgerId = ledgersToRereplicate.next(); + LOG.error("Ledger: {} is underreplicated which is not expected", ledgerId); + } + fail("There are not supposed to be any underreplicatedledgers"); + } + bkAdmin.close(); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java index 103f1ce..f2a7316 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java @@ -20,6 +20,11 @@ */ package org.apache.bookkeeper.replication; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -33,6 +38,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -40,13 +46,19 @@ import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.test.MultiLedgerManagerTestCase; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -56,8 +68,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; - /** * Tests publishing of under replicated ledgers by the Auditor bookie node when * corresponding bookies identifes as not running @@ -337,7 +347,7 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase { .size()); // wait for 5 seconds before starting the recovery work when a bookie fails - baseConf.setLostBookieRecoveryDelay(5); + urLedgerMgr.setLostBookieRecoveryDelay(5); // shutdown a non auditor bookie; choosing non-auditor to avoid another election String shutdownBookie = shutDownNonAuditorBookie(); @@ -395,6 +405,247 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase { _testDelayedAuditOfLostBookies(); } + @Test(timeout=60000) + public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws Exception { + // wait for a second so that the initial periodic check finishes + Thread.sleep(1000); + + LedgerHandle lh1 = createAndAddEntriesToLedger(); + Long ledgerId = lh1.getId(); + LOG.debug("Created ledger : " + ledgerId); + ledgerList.add(ledgerId); + lh1.close(); + + final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList + .size()); + + // wait for 50 seconds before starting the recovery work when a bookie fails + urLedgerMgr.setLostBookieRecoveryDelay(50); + + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + String shutdownBookie = shutDownNonAuditorBookie(); + + LOG.debug("Waiting for ledgers to be marked as under replicated"); + assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(4, TimeUnit.SECONDS)); + assertEquals("under replicated ledgers identified when it was not expected", 0, + urLedgerList.size()); + + // set lostBookieRecoveryDelay to 0, so that it triggers AuditTask immediately + urLedgerMgr.setLostBookieRecoveryDelay(0); + + // wait for 1 second for the ledger to get reported as under replicated + assertTrue("audit of lost bookie isn't delayed", underReplicaLatch.await(1, TimeUnit.SECONDS)); + + assertTrue("Ledger is not marked as underreplicated:" + ledgerId, + urLedgerList.contains(ledgerId)); + Map urLedgerData = getUrLedgerData(urLedgerList); + String data = urLedgerData.get(ledgerId); + assertTrue("Bookie " + shutdownBookie + + "is not listed in the ledger as missing replica :" + data, + data.contains(shutdownBookie)); + } + + @Test(timeout=60000) + public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Exception { + // wait for a second so that the initial periodic check finishes + Thread.sleep(1000); + + LedgerHandle lh1 = createAndAddEntriesToLedger(); + Long ledgerId = lh1.getId(); + LOG.debug("Created ledger : " + ledgerId); + ledgerList.add(ledgerId); + lh1.close(); + + final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList + .size()); + + // wait for 3 seconds before starting the recovery work when a bookie fails + urLedgerMgr.setLostBookieRecoveryDelay(3); + + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + String shutdownBookie = shutDownNonAuditorBookie(); + + LOG.debug("Waiting for ledgers to be marked as under replicated"); + assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); + assertEquals("under replicated ledgers identified when it was not expected", 0, + urLedgerList.size()); + + // set lostBookieRecoveryDelay to 4, so the pending AuditTask is resheduled + urLedgerMgr.setLostBookieRecoveryDelay(4); + + // since we changed the BookieRecoveryDelay period to 4, the audittask shouldn't have been executed + LOG.debug("Waiting for ledgers to be marked as under replicated"); + assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); + assertEquals("under replicated ledgers identified when it was not expected", 0, + urLedgerList.size()); + + // wait for 3 seconds (since we already waited for 2 secs) for the ledger to get reported as under replicated + assertTrue("audit of lost bookie isn't delayed", underReplicaLatch.await(3, TimeUnit.SECONDS)); + assertTrue("Ledger is not marked as underreplicated:" + ledgerId, + urLedgerList.contains(ledgerId)); + Map urLedgerData = getUrLedgerData(urLedgerList); + String data = urLedgerData.get(ledgerId); + assertTrue("Bookie " + shutdownBookie + + "is not listed in the ledger as missing replica :" + data, + data.contains(shutdownBookie)); + } + + @Test(timeout=60000) + public void testTriggerAuditorWithNoPendingAuditTask() throws Exception { + // wait for a second so that the initial periodic check finishes + Thread.sleep(1000); + int lostBookieRecoveryDelayConfValue = baseConf.getLostBookieRecoveryDelay(); + Auditor auditorBookiesAuditor = getAuditorBookiesAuditor(); + Future auditTask = auditorBookiesAuditor.getAuditTask(); + int lostBookieRecoveryDelayBeforeChange = auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange(); + Assert.assertEquals("auditTask is supposed to be null", null, auditTask); + Assert.assertEquals( + "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to BaseConf's lostBookieRecoveryDelay", + lostBookieRecoveryDelayConfValue, lostBookieRecoveryDelayBeforeChange); + + // there is no easy way to validate if the Auditor has executed Audit process (Auditor.startAudit), + // without shuttingdown Bookie. To test if by resetting LostBookieRecoveryDelay it does Auditing + // even when there is no pending AuditTask, following approach is needed. + + // Here we are creating few ledgers ledgermetadata with non-existing bookies as its ensemble. + // When Auditor does audit it recognizes these ledgers as underreplicated and mark them as + // under-replicated, since these bookies are not available. + int numofledgers = 5; + Random rand = new Random(); + for (int i = 0; i < numofledgers; i++) { + LedgerMetadata metadata = new LedgerMetadata(3, 2, 2, DigestType.CRC32, "passwd".getBytes(), null); + ArrayList ensemble = new ArrayList(); + ensemble.add(new BookieSocketAddress("99.99.99.99:9999")); + ensemble.add(new BookieSocketAddress("11.11.11.11:1111")); + ensemble.add(new BookieSocketAddress("88.88.88.88:8888")); + metadata.addEnsemble(0, ensemble); + LedgerManager ledgerManager = LedgerManagerFactory.newLedgerManagerFactory(baseClientConf, zkc) + .newLedgerManager(); + MutableInt ledgerCreateRC = new MutableInt(-1); + CountDownLatch latch = new CountDownLatch(1); + long ledgerId = (Math.abs(rand.nextLong())) % 100000000; + ledgerManager.createLedgerMetadata(ledgerId, metadata, + new BookkeeperInternalCallbacks.GenericCallback() { + @Override + public void operationComplete(int rc, Void result) { + ledgerCreateRC.setValue(rc); + latch.countDown(); + } + }); + Assert.assertTrue("Ledger creation should complete within 2 secs", + latch.await(2000, TimeUnit.MILLISECONDS)); + Assert.assertEquals("LedgerCreate should succeed and return OK rc value", BKException.Code.OK, + ledgerCreateRC.getValue()); + ledgerList.add(ledgerId); + } + + final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size()); + urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelayBeforeChange); + assertTrue("Audit should be triggered and created ledgers should be marked as underreplicated", + underReplicaLatch.await(2, TimeUnit.SECONDS)); + assertEquals("All the ledgers should be marked as underreplicated", ledgerList.size(), urLedgerList.size()); + + auditTask = auditorBookiesAuditor.getAuditTask(); + Assert.assertEquals("auditTask is supposed to be null", null, auditTask); + Assert.assertEquals( + "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to BaseConf's lostBookieRecoveryDelay", + lostBookieRecoveryDelayBeforeChange, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange()); + } + + @Test(timeout=60000) + public void testTriggerAuditorWithPendingAuditTask() throws Exception { + // wait for a second so that the initial periodic check finishes + Thread.sleep(1000); + + Auditor auditorBookiesAuditor = getAuditorBookiesAuditor(); + LedgerHandle lh1 = createAndAddEntriesToLedger(); + Long ledgerId = lh1.getId(); + LOG.debug("Created ledger : " + ledgerId); + ledgerList.add(ledgerId); + lh1.close(); + + final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList + .size()); + + int lostBookieRecoveryDelay = 5; + // wait for 5 seconds before starting the recovery work when a bookie fails + urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); + + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + String shutdownBookie = shutDownNonAuditorBookie(); + + LOG.debug("Waiting for ledgers to be marked as under replicated"); + assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); + assertEquals("under replicated ledgers identified when it was not expected", 0, + urLedgerList.size()); + + Future auditTask = auditorBookiesAuditor.getAuditTask(); + Assert.assertNotEquals("auditTask is not supposed to be null", null, auditTask); + Assert.assertEquals( + "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set", + lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange()); + + // set lostBookieRecoveryDelay to 5 (previous value), so that Auditor is triggered immediately + urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); + assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); + assertEquals("all under replicated ledgers should be identified", ledgerList.size(), + urLedgerList.size()); + + Thread.sleep(100); + auditTask = auditorBookiesAuditor.getAuditTask(); + Assert.assertEquals("auditTask is supposed to be null", null, auditTask); + Assert.assertEquals( + "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value", + lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange()); + } + + @Test(timeout=60000) + public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws Exception { + // wait for a second so that the initial periodic check finishes + Thread.sleep(1000); + + Auditor auditorBookiesAuditor = getAuditorBookiesAuditor(); + LedgerHandle lh1 = createAndAddEntriesToLedger(); + Long ledgerId = lh1.getId(); + LOG.debug("Created ledger : " + ledgerId); + ledgerList.add(ledgerId); + lh1.close(); + + final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList + .size()); + + int lostBookieRecoveryDelay = 5; + // wait for 5 seconds before starting the recovery work when a bookie fails + urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); + + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + String shutdownBookie = shutDownNonAuditorBookie(); + + LOG.debug("Waiting for ledgers to be marked as under replicated"); + assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); + assertEquals("under replicated ledgers identified when it was not expected", 0, + urLedgerList.size()); + + Future auditTask = auditorBookiesAuditor.getAuditTask(); + Assert.assertNotEquals("auditTask is not supposed to be null", null, auditTask); + Assert.assertEquals( + "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set", + lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange()); + + // set lostBookieRecoveryDelay to 0, so that Auditor is triggered immediately + urLedgerMgr.setLostBookieRecoveryDelay(0); + assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(1, TimeUnit.SECONDS)); + assertEquals("all under replicated ledgers should be identified", ledgerList.size(), + urLedgerList.size()); + + Thread.sleep(100); + auditTask = auditorBookiesAuditor.getAuditTask(); + Assert.assertEquals("auditTask is supposed to be null", null, auditTask); + Assert.assertEquals( + "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value", + 0, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange()); + } + /** * Test audit of bookies is delayed when one bookie is down. But when * another one goes down, the audit is started immediately. @@ -414,7 +665,7 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase { CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size()); // wait for 10 seconds before starting the recovery work when a bookie fails - baseConf.setLostBookieRecoveryDelay(10); + urLedgerMgr.setLostBookieRecoveryDelay(10); // shutdown a non auditor bookie to avoid an election String shutdownBookie1 = shutDownNonAuditorBookie(); @@ -467,7 +718,7 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase { CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList.size()); // wait for 5 seconds before starting the recovery work when a bookie fails - baseConf.setLostBookieRecoveryDelay(5); + urLedgerMgr.setLostBookieRecoveryDelay(5); // shutdown a non auditor bookie to avoid an election int idx1 = getShutDownNonAuditorBookieIdx(""); @@ -647,6 +898,12 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase { return auditors.get(0); } + private Auditor getAuditorBookiesAuditor() throws Exception { + BookieServer auditorBookieServer = getAuditorBookie(); + String bookieAddr = auditorBookieServer.getLocalAddress().toString(); + return auditorElectors.get(bookieAddr).auditor; + } + private String shutDownNonAuditorBookie() throws Exception { // shutdown bookie which is not an auditor int indexOf = bs.indexOf(getAuditorBookie()); -- To stop receiving notification emails like this one, please contact ['"commits@bookkeeper.apache.org" '].