Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@minotaur.apache.org Received: (qmail 14454 invoked from network); 9 Jul 2010 21:19:33 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 9 Jul 2010 21:19:33 -0000 Received: (qmail 9734 invoked by uid 500); 9 Jul 2010 21:19:33 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 9703 invoked by uid 500); 9 Jul 2010 21:19:33 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 9695 invoked by uid 99); 9 Jul 2010 21:19:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Jul 2010 21:19:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Jul 2010 21:19:25 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 242CF23888D2; Fri, 9 Jul 2010 21:18:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r962697 - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/ src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ Date: Fri, 09 Jul 2010 21:18:00 -0000 To: zookeeper-commits@hadoop.apache.org From: breed@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100709211801.242CF23888D2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: breed Date: Fri Jul 9 21:17:57 2010 New Revision: 962697 URL: http://svn.apache.org/viewvc?rev=962697&view=rev Log: ZOOKEEPER-712. Bookie recovery. Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=962697&r1=962696&r2=962697&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Jul 9 21:17:57 2010 @@ -99,6 +99,8 @@ NEW FEATURES: ZOOKEEPER-744. Add monitoring four-letter word (Savu Andrei via phunt) + ZOOKEEPER-712. Bookie recovery. (erwin tam via breed) + Release 3.3.0 - 2010-03-24 Non-backward compatible changes: Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java?rev=962697&r1=962696&r2=962697&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java Fri Jul 9 21:17:57 2010 @@ -111,4 +111,16 @@ public interface AsyncCallback { void deleteComplete(int rc, Object ctx); } + public interface RecoverCallback { + /** + * Callback definition for bookie recover operations + * + * @param rc + * return code + * @param ctx + * control object + */ + void recoverComplete(int rc, Object ctx); + } + } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=962697&r1=962696&r2=962697&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Fri Jul 9 21:17:57 2010 @@ -159,6 +159,15 @@ public class BookKeeper implements OpenC } /** + * Get the BookieClient, currently used for doing bookie recovery. + * + * @return BookieClient for the BookKeeper instance. + */ + public BookieClient getBookieClient() { + return bookieClient; + } + + /** * Creates a new ledger asynchronously. To create a ledger, we need to specify * the ensemble size, the quorum size, the digest type, a password, a callback * implementation, and an optional control object. The ensemble size is how Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java?rev=962697&r1=962696&r2=962697&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java Fri Jul 9 21:17:57 2010 @@ -35,7 +35,7 @@ import org.jboss.netty.buffer.ChannelBuf * for the packet. Currently 2 types of digests are supported: MAC (based on SHA-1) and CRC32 */ -abstract class DigestManager { +public abstract class DigestManager { static final Logger logger = Logger.getLogger(DigestManager.class); long ledgerId; @@ -67,7 +67,7 @@ abstract class DigestManager { } } - ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, byte[] data) { + public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, byte[] data) { byte[] bufferArray = new byte[24+macCodeLength]; ByteBuffer buffer = ByteBuffer.wrap(bufferArray); Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=962697&r1=962696&r2=962697&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java Fri Jul 9 21:17:57 2010 @@ -28,7 +28,7 @@ package org.apache.bookkeeper.client; * to. */ -interface DistributionSchedule { +public interface DistributionSchedule { /** * Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=962697&r1=962696&r2=962697&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Jul 9 21:17:57 2010 @@ -121,7 +121,43 @@ public class LedgerHandle implements Rea return lastAddPushed; } - void writeLedgerConfig(StatCallback callback, Object ctx) { + /** + * Get the Ledger's key/password. + * + * @return byte array for the ledger's key/password. + */ + public byte[] getLedgerKey() { + return ledgerKey; + } + + /** + * Get the LedgerMetadata + * + * @return LedgerMetadata for the LedgerHandle + */ + public LedgerMetadata getLedgerMetadata() { + return metadata; + } + + /** + * Get the DigestManager + * + * @return DigestManager for the LedgerHandle + */ + public DigestManager getDigestManager() { + return macManager; + } + + /** + * Get the Distribution Schedule + * + * @return DistributionSchedule for the LedgerHandle + */ + public DistributionSchedule getDistributionSchedule() { + return distributionSchedule; + } + + public void writeLedgerConfig(StatCallback callback, Object ctx) { bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId), metadata.serialize(), -1, callback, ctx); } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=962697&r1=962696&r2=962697&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java Fri Jul 9 21:17:57 2010 @@ -33,7 +33,7 @@ import org.apache.log4j.Logger; * in zookeeper. It provides parsing and serialization methods of such metadata. * */ -class LedgerMetadata { +public class LedgerMetadata { static final Logger LOG = Logger.getLogger(LedgerMetadata.class); private static final String closed = "CLOSED"; @@ -59,6 +59,17 @@ class LedgerMetadata { this(0, 0); } + /** + * Get the Map of bookie ensembles for the various ledger fragments + * that make up the ledger. + * + * @return SortedMap of Ledger Fragments and the corresponding + * bookie ensembles that store the entries. + */ + public SortedMap> getEnsembles() { + return ensembles; + } + boolean isClosed() { return close != NOTCLOSED; } Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java?rev=962697&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java (added) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java Fri Jul 9 21:17:57 2010 @@ -0,0 +1,762 @@ +package org.apache.bookkeeper.tools; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.log4j.Logger; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.data.Stat; +import org.jboss.netty.buffer.ChannelBuffer; + +/** + * Provides Admin Tools to manage the BookKeeper cluster. + * + */ +public class BookKeeperTools { + + private static Logger LOG = Logger.getLogger(BookKeeperTools.class); + + // ZK client instance + private ZooKeeper zk; + // ZK ledgers related String constants + static final String LEDGERS_PATH = "/ledgers"; + static final String LEDGER_NODE_PREFIX = "L"; + static final String AVAILABLE_NODE = "available"; + static final String BOOKIES_PATH = LEDGERS_PATH + "/" + AVAILABLE_NODE; + static final String COLON = ":"; + + // BookKeeper client instance + private BookKeeper bkc; + + /* + * Random number generator used to choose an available bookie server to + * replicate data from a dead bookie. + */ + private Random rand = new Random(); + + /* + * For now, assume that all ledgers were created with the same DigestType + * and password. In the future, this admin tool will need to know for each + * ledger, what was the DigestType and password used to create it before it + * can open it. These values will come from System properties, though hard + * coded defaults are defined here. + */ + private DigestType DIGEST_TYPE = DigestType.valueOf(System.getProperty("digestType", DigestType.CRC32.toString())); + private byte[] PASSWD = System.getProperty("passwd", "").getBytes(); + + /** + * Constructor that takes in a ZooKeeper servers connect string so we know + * how to connect to ZooKeeper to retrieve information about the BookKeeper + * cluster. We need this before we can do any type of admin operations on + * the BookKeeper cluster. + * + * @param zkServers + * Comma separated list of hostname:port pairs for the ZooKeeper + * servers cluster. + * @throws IOException + * Throws this exception if there is an error instantiating the + * ZooKeeper client. + * @throws InterruptedException + * Throws this exception if there is an error instantiating the + * BookKeeper client. + * @throws KeeperException + * Throws this exception if there is an error instantiating the + * BookKeeper client. + */ + public BookKeeperTools(String zkServers) throws IOException, InterruptedException, KeeperException { + // Create the ZooKeeper client instance + zk = new ZooKeeper(zkServers, 10000, new Watcher() { + @Override + public void process(WatchedEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Process: " + event.getType() + " " + event.getPath()); + } + } + }); + // Create the BookKeeper client instance + bkc = new BookKeeper(zk); + } + + /** + * Shutdown method to gracefully release resources that this class uses. + * + * @throws InterruptedException + * if there is an error shutting down the clients that this + * class uses. + */ + public void shutdown() throws InterruptedException { + bkc.halt(); + zk.close(); + } + + /** + * This is a multi callback object for bookie recovery that waits for all of + * the multiple async operations to complete. If any fail, then we invoke + * the final callback with a BK LedgerRecoveryException. + */ + class MultiCallback implements AsyncCallback.VoidCallback { + // Number of expected callbacks + final int expected; + // Final callback and the corresponding context to invoke + final AsyncCallback.VoidCallback cb; + final Object context; + // This keeps track of how many operations have completed + final AtomicInteger done = new AtomicInteger(); + // List of the exceptions from operations that completed unsuccessfully + final LinkedBlockingQueue exceptions = new LinkedBlockingQueue(); + + MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context) { + this.expected = expected; + this.cb = cb; + this.context = context; + if (expected == 0) { + cb.processResult(Code.OK.intValue(), null, context); + } + } + + private void tick() { + if (done.incrementAndGet() == expected) { + if (exceptions.isEmpty()) { + cb.processResult(Code.OK.intValue(), null, context); + } else { + cb.processResult(BKException.Code.LedgerRecoveryException, null, context); + } + } + } + + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc != Code.OK.intValue()) { + LOG.error("BK error recovering ledger data", BKException.create(rc)); + exceptions.add(rc); + } + tick(); + } + + } + + /** + * Method to get the input ledger's digest type. For now, this is just a + * placeholder function since there is no way we can get this information + * easily. In the future, BookKeeper should store this ledger metadata + * somewhere such that an admin tool can access it. + * + * @param ledgerId + * LedgerId we are retrieving the digestType for. + * @return DigestType for the input ledger + */ + private DigestType getLedgerDigestType(long ledgerId) { + return DIGEST_TYPE; + } + + /** + * Method to get the input ledger's password. For now, this is just a + * placeholder function since there is no way we can get this information + * easily. In the future, BookKeeper should store this ledger metadata + * somewhere such that an admin tool can access it. + * + * @param ledgerId + * LedgerId we are retrieving the password for. + * @return Password for the input ledger + */ + private byte[] getLedgerPasswd(long ledgerId) { + return PASSWD; + } + + // Object used for calling async methods and waiting for them to complete. + class SyncObject { + boolean value; + + public SyncObject() { + value = false; + } + } + + /** + * Synchronous method to rebuild and recover the ledger fragments data that + * was stored on the source bookie. That bookie could have failed completely + * and now the ledger data that was stored on it is under replicated. An + * optional destination bookie server could be given if we want to copy all + * of the ledger fragments data on the failed source bookie to it. + * Otherwise, we will just randomly distribute the ledger fragments to the + * active set of bookies, perhaps based on load. All ZooKeeper ledger + * metadata will be updated to point to the new bookie(s) that contain the + * replicated ledger fragments. + * + * @param bookieSrc + * Source bookie that had a failure. We want to replicate the + * ledger fragments that were stored there. + * @param bookieDest + * Optional destination bookie that if passed, we will copy all + * of the ledger fragments from the source bookie over to it. + */ + public void recoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest) + throws InterruptedException { + SyncObject sync = new SyncObject(); + // Call the async method to recover bookie data. + asyncRecoverBookieData(bookieSrc, bookieDest, new RecoverCallback() { + @Override + public void recoverComplete(int rc, Object ctx) { + LOG.info("Recover bookie operation completed with rc: " + rc); + SyncObject syncObj = (SyncObject) ctx; + synchronized (syncObj) { + syncObj.value = true; + syncObj.notify(); + } + } + }, sync); + + // Wait for the async method to complete. + synchronized (sync) { + while (sync.value == false) { + sync.wait(); + } + } + } + + /** + * Async method to rebuild and recover the ledger fragments data that was + * stored on the source bookie. That bookie could have failed completely and + * now the ledger data that was stored on it is under replicated. An + * optional destination bookie server could be given if we want to copy all + * of the ledger fragments data on the failed source bookie to it. + * Otherwise, we will just randomly distribute the ledger fragments to the + * active set of bookies, perhaps based on load. All ZooKeeper ledger + * metadata will be updated to point to the new bookie(s) that contain the + * replicated ledger fragments. + * + * @param bookieSrc + * Source bookie that had a failure. We want to replicate the + * ledger fragments that were stored there. + * @param bookieDest + * Optional destination bookie that if passed, we will copy all + * of the ledger fragments from the source bookie over to it. + * @param cb + * RecoverCallback to invoke once all of the data on the dead + * bookie has been recovered and replicated. + * @param context + * Context for the RecoverCallback to call. + */ + public void asyncRecoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest, + final RecoverCallback cb, final Object context) { + // Sync ZK to make sure we're reading the latest bookie/ledger data. + zk.sync(LEDGERS_PATH, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc != Code.OK.intValue()) { + LOG.error("ZK error syncing: ", KeeperException.create(KeeperException.Code.get(rc), path)); + cb.recoverComplete(BKException.Code.ZKException, context); + return; + } + getAvailableBookies(bookieSrc, bookieDest, cb, context); + }; + }, null); + } + + /** + * This method asynchronously gets the set of available Bookies that the + * dead input bookie's data will be copied over into. If the user passed in + * a specific destination bookie, then just use that one. Otherwise, we'll + * randomly pick one of the other available bookies to use for each ledger + * fragment we are replicating. + * + * @param bookieSrc + * Source bookie that had a failure. We want to replicate the + * ledger fragments that were stored there. + * @param bookieDest + * Optional destination bookie that if passed, we will copy all + * of the ledger fragments from the source bookie over to it. + * @param cb + * RecoverCallback to invoke once all of the data on the dead + * bookie has been recovered and replicated. + * @param context + * Context for the RecoverCallback to call. + */ + private void getAvailableBookies(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest, + final RecoverCallback cb, final Object context) { + final List availableBookies = new LinkedList(); + if (bookieDest != null) { + availableBookies.add(bookieDest); + // Now poll ZK to get the active ledgers + getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies); + } else { + zk.getChildren(BOOKIES_PATH, null, new AsyncCallback.ChildrenCallback() { + @Override + public void processResult(int rc, String path, Object ctx, List children) { + if (rc != Code.OK.intValue()) { + LOG.error("ZK error getting bookie nodes: ", KeeperException.create(KeeperException.Code + .get(rc), path)); + cb.recoverComplete(BKException.Code.ZKException, context); + return; + } + for (String bookieNode : children) { + String parts[] = bookieNode.split(COLON); + if (parts.length < 2) { + LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode); + cb.recoverComplete(BKException.Code.ZKException, context); + return; + } + availableBookies.add(new InetSocketAddress(parts[0], Integer.parseInt(parts[1]))); + } + // Now poll ZK to get the active ledgers + getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies); + } + }, null); + } + } + + /** + * This method asynchronously polls ZK to get the current set of active + * ledgers. From this, we can open each ledger and look at the metadata to + * determine if any of the ledger fragments for it were stored at the dead + * input bookie. + * + * @param bookieSrc + * Source bookie that had a failure. We want to replicate the + * ledger fragments that were stored there. + * @param bookieDest + * Optional destination bookie that if passed, we will copy all + * of the ledger fragments from the source bookie over to it. + * @param cb + * RecoverCallback to invoke once all of the data on the dead + * bookie has been recovered and replicated. + * @param context + * Context for the RecoverCallback to call. + * @param availableBookies + * List of Bookie Servers that are available to use for + * replicating data on the failed bookie. This could contain a + * single bookie server if the user explicitly chose a bookie + * server to replicate data to. + */ + private void getActiveLedgers(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest, + final RecoverCallback cb, final Object context, final List availableBookies) { + zk.getChildren(LEDGERS_PATH, null, new AsyncCallback.ChildrenCallback() { + @Override + public void processResult(int rc, String path, Object ctx, List children) { + if (rc != Code.OK.intValue()) { + LOG.error("ZK error getting ledger nodes: ", KeeperException.create(KeeperException.Code.get(rc), + path)); + cb.recoverComplete(BKException.Code.ZKException, context); + return; + } + // Wrapper class around the RecoverCallback so it can be used + // as the final VoidCallback to invoke within the MultiCallback. + class RecoverCallbackWrapper implements AsyncCallback.VoidCallback { + final RecoverCallback cb; + + RecoverCallbackWrapper(RecoverCallback cb) { + this.cb = cb; + } + + @Override + public void processResult(int rc, String path, Object ctx) { + cb.recoverComplete(rc, ctx); + } + } + // Recover each of the ledgers asynchronously + MultiCallback ledgerMcb = new MultiCallback(children.size(), new RecoverCallbackWrapper(cb), context); + for (final String ledgerNode : children) { + recoverLedger(bookieSrc, ledgerNode, ledgerMcb, availableBookies); + } + } + }, null); + } + + /** + * This method asynchronously recovers a given ledger if any of the ledger + * entries were stored on the failed bookie. + * + * @param bookieSrc + * Source bookie that had a failure. We want to replicate the + * ledger fragments that were stored there. + * @param ledgerNode + * Ledger Node name as retrieved from ZooKeeper we want to + * recover. + * @param ledgerMcb + * MultiCallback to invoke once we've recovered the current + * ledger. + * @param availableBookies + * List of Bookie Servers that are available to use for + * replicating data on the failed bookie. This could contain a + * single bookie server if the user explicitly chose a bookie + * server to replicate data to. + */ + private void recoverLedger(final InetSocketAddress bookieSrc, final String ledgerNode, + final MultiCallback ledgerMcb, final List availableBookies) { + /* + * The available node is also stored in this path so ignore that. That + * node is the path for the set of available Bookie Servers. + */ + if (ledgerNode.equals(AVAILABLE_NODE)) { + ledgerMcb.processResult(BKException.Code.OK, null, null); + return; + } + // Parse out the ledgerId from the ZK ledger node. + String parts[] = ledgerNode.split(LEDGER_NODE_PREFIX); + if (parts.length < 2) { + LOG.error("Ledger Node retrieved from ZK has invalid name format: " + ledgerNode); + ledgerMcb.processResult(BKException.Code.ZKException, null, null); + return; + } + final long lId; + try { + lId = Long.parseLong(parts[parts.length - 1]); + } catch (NumberFormatException e) { + LOG.error("Error retrieving ledgerId from ledgerNode: " + ledgerNode, e); + ledgerMcb.processResult(BKException.Code.ZKException, null, null); + return; + } + /* + * For the current ledger, open it to retrieve the LedgerHandle. This + * will contain the LedgerMetadata indicating which bookie servers the + * ledger fragments are stored on. Check if any of the ledger fragments + * for the current ledger are stored on the input dead bookie. + */ + DigestType digestType = getLedgerDigestType(lId); + byte[] passwd = getLedgerPasswd(lId); + bkc.asyncOpenLedger(lId, digestType, passwd, new OpenCallback() { + @Override + public void openComplete(int rc, final LedgerHandle lh, Object ctx) { + if (rc != Code.OK.intValue()) { + LOG.error("BK error opening ledger: " + lId, BKException.create(rc)); + ledgerMcb.processResult(rc, null, null); + return; + } + /* + * This List stores the ledger fragments to recover indexed by + * the start entry ID for the range. The ensembles TreeMap is + * keyed off this. + */ + final List ledgerFragmentsToRecover = new LinkedList(); + /* + * This Map will store the start and end entry ID values for + * each of the ledger fragment ranges. The only exception is the + * current active fragment since it has no end yet. In the event + * of a bookie failure, a new ensemble is created so the current + * ensemble should not contain the dead bookie we are trying to + * recover. + */ + Map ledgerFragmentsRange = new HashMap(); + Long curEntryId = null; + for (Map.Entry> entry : lh.getLedgerMetadata().getEnsembles() + .entrySet()) { + if (curEntryId != null) + ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1); + curEntryId = entry.getKey(); + if (entry.getValue().contains(bookieSrc)) { + /* + * Current ledger fragment has entries stored on the + * dead bookie so we'll need to recover them. + */ + ledgerFragmentsToRecover.add(entry.getKey()); + } + } + /* + * See if this current ledger contains any ledger fragment that + * needs to be re-replicated. If not, then just invoke the + * multiCallback and return. + */ + if (ledgerFragmentsToRecover.size() == 0) { + ledgerMcb.processResult(BKException.Code.OK, null, null); + return; + } + /* + * We have ledger fragments that need to be re-replicated to a + * new bookie. Choose one randomly from the available set of + * bookies. + */ + final InetSocketAddress newBookie = availableBookies.get(rand.nextInt(availableBookies.size())); + + /* + * Wrapper class around the ledger MultiCallback. Once all + * ledger fragments for the ledger have been replicated to a new + * bookie, we need to update ZK with this new metadata to point + * to the new bookie instead of the old dead one. That should be + * done at the end prior to invoking the ledger MultiCallback. + */ + class LedgerMultiCallbackWrapper implements AsyncCallback.VoidCallback { + final MultiCallback ledgerMcb; + + LedgerMultiCallbackWrapper(MultiCallback ledgerMcb) { + this.ledgerMcb = ledgerMcb; + } + + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc != Code.OK.intValue()) { + LOG.error("BK error replicating ledger fragments for ledger: " + lId, BKException + .create(rc)); + ledgerMcb.processResult(rc, null, null); + return; + } + /* + * Update the ledger metadata's ensemble info to point + * to the new bookie. + */ + for (final Long startEntryId : ledgerFragmentsToRecover) { + ArrayList ensemble = lh.getLedgerMetadata().getEnsembles().get( + startEntryId); + int deadBookieIndex = ensemble.indexOf(bookieSrc); + ensemble.remove(deadBookieIndex); + ensemble.add(deadBookieIndex, newBookie); + } + lh.writeLedgerConfig(new AsyncCallback.StatCallback() { + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + if (rc != Code.OK.intValue()) { + LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(), + KeeperException.create(KeeperException.Code.get(rc), path)); + } else { + LOG.info("Updated ZK for ledgerId: (" + lh.getId() + + ") to point ledger fragments from old dead bookie: (" + bookieSrc + + ") to new bookie: (" + newBookie + ")"); + } + /* + * Pass the return code result up the chain with + * the parent callback. + */ + ledgerMcb.processResult(rc, null, null); + } + }, null); + } + } + + /* + * Now recover all of the necessary ledger fragments + * asynchronously using a MultiCallback for every fragment. + */ + MultiCallback ledgerFragmentMcb = new MultiCallback(ledgerFragmentsToRecover.size(), + new LedgerMultiCallbackWrapper(ledgerMcb), null); + for (final Long startEntryId : ledgerFragmentsToRecover) { + Long endEntryId = ledgerFragmentsRange.get(startEntryId); + try { + recoverLedgerFragment(bookieSrc, lh, startEntryId, endEntryId, ledgerFragmentMcb, newBookie); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + }, null); + } + + /** + * This method asynchronously recovers a ledger fragment which is a + * contiguous portion of a ledger that was stored in an ensemble that + * included the failed bookie. + * + * @param bookieSrc + * Source bookie that had a failure. We want to replicate the + * ledger fragments that were stored there. + * @param lh + * LedgerHandle for the ledger + * @param startEntryId + * Start entry Id for the ledger fragment + * @param endEntryId + * End entry Id for the ledger fragment + * @param ledgerFragmentMcb + * MultiCallback to invoke once we've recovered the current + * ledger fragment. + * @param newBookie + * New bookie we want to use to recover and replicate the ledger + * entries that were stored on the failed bookie. + */ + private void recoverLedgerFragment(final InetSocketAddress bookieSrc, final LedgerHandle lh, + final Long startEntryId, final Long endEntryId, final MultiCallback ledgerFragmentMcb, + final InetSocketAddress newBookie) throws InterruptedException { + if (endEntryId == null) { + /* + * Ideally this should never happen if bookie failure is taken care + * of properly. Nothing we can do though in this case. + */ + LOG.warn("Dead bookie (" + bookieSrc + ") is still part of the current active ensemble for ledgerId: " + + lh.getId()); + ledgerFragmentMcb.processResult(BKException.Code.OK, null, null); + return; + } + + ArrayList curEnsemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId); + int bookieIndex = 0; + for (int i = 0; i < curEnsemble.size(); i++) { + if (curEnsemble.get(i).equals(bookieSrc)) { + bookieIndex = i; + break; + } + } + /* + * Loop through all entries in the current ledger fragment range and + * find the ones that were stored on the dead bookie. + */ + List entriesToReplicate = new LinkedList(); + for (long i = startEntryId; i <= endEntryId; i++) { + if (lh.getDistributionSchedule().getReplicaIndex(i, bookieIndex) >= 0) { + /* + * Current entry is stored on the dead bookie so we'll need to + * read it and replicate it to a new bookie. + */ + entriesToReplicate.add(i); + } + } + /* + * Now asynchronously replicate all of the entries for the ledger + * fragment that were on the dead bookie. + */ + MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), ledgerFragmentMcb, null); + for (final Long entryId : entriesToReplicate) { + recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookie); + } + } + + /** + * This method asynchronously recovers a specific ledger entry by reading + * the values via the BookKeeper Client (which would read it from the other + * replicas) and then writing it to the chosen new bookie. + * + * @param entryId + * Ledger Entry ID to recover. + * @param lh + * LedgerHandle for the ledger + * @param ledgerFragmentEntryMcb + * MultiCallback to invoke once we've recovered the current + * ledger entry. + * @param newBookie + * New bookie we want to use to recover and replicate the ledger + * entries that were stored on the failed bookie. + */ + private void recoverLedgerFragmentEntry(final Long entryId, final LedgerHandle lh, + final MultiCallback ledgerFragmentEntryMcb, final InetSocketAddress newBookie) throws InterruptedException { + /* + * Read the ledger entry using the LedgerHandle. This will allow us to + * read the entry from one of the other replicated bookies other than + * the dead one. + */ + lh.asyncReadEntries(entryId, entryId, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + if (rc != Code.OK.intValue()) { + LOG.error("BK error reading ledger entry: " + entryId, BKException.create(rc)); + ledgerFragmentEntryMcb.processResult(rc, null, null); + return; + } + /* + * Now that we've read the ledger entry, write it to the new + * bookie we've selected. + */ + ChannelBuffer toSend = lh.getDigestManager().computeDigestAndPackageForSending(entryId, + lh.getLastAddConfirmed(), seq.nextElement().getEntry()); + bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, toSend, + new WriteCallback() { + @Override + public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, + Object ctx) { + if (rc != Code.OK.intValue()) { + LOG.error("BK error writing entry for ledgerId: " + ledgerId + ", entryId: " + + entryId + ", bookie: " + addr, BKException.create(rc)); + } else { + LOG.debug("Success writing ledger entry to a new bookie!"); + } + /* + * Pass the return code result up the chain with + * the parent callback. + */ + ledgerFragmentEntryMcb.processResult(rc, null, null); + } + }, null); + } + }, null); + } + + /** + * Main method so we can invoke the bookie recovery via command line. + * + * @param args + * Arguments to BookKeeperTools. 2 are required and the third is + * optional. The first is a comma separated list of ZK server + * host:port pairs. The second is the host:port socket address + * for the bookie we are trying to recover. The third is the + * host:port socket address of the optional destination bookie + * server we want to replicate the data over to. + * @throws InterruptedException + * @throws IOException + * @throws KeeperException + */ + public static void main(String[] args) throws InterruptedException, IOException, KeeperException { + // Validate the inputs + if (args.length < 2) { + System.err.println("USAGE: BookKeeperTools zkServers bookieSrc [bookieDest]"); + return; + } + // Parse out the input arguments + String zkServers = args[0]; + String bookieSrcString[] = args[1].split(COLON); + if (bookieSrcString.length < 2) { + System.err.println("BookieSrc inputted has invalid name format (host:port expected): " + bookieSrcString); + return; + } + final InetSocketAddress bookieSrc = new InetSocketAddress(bookieSrcString[0], Integer + .parseInt(bookieSrcString[1])); + InetSocketAddress bookieDest = null; + if (args.length < 3) { + String bookieDestString[] = args[2].split(COLON); + if (bookieDestString.length < 2) { + System.err.println("BookieDest inputted has invalid name format (host:port expected): " + + bookieDestString); + return; + } + bookieDest = new InetSocketAddress(bookieDestString[0], Integer.parseInt(bookieDestString[1])); + } + + // Create the BookKeeperTools instance and perform the bookie recovery + // synchronously. + BookKeeperTools bkTools = new BookKeeperTools(zkServers); + bkTools.recoverBookieData(bookieSrc, bookieDest); + + // Shutdown the resources used in the BookKeeperTools instance. + bkTools.shutdown(); + } + +} Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java?rev=962697&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java (added) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java Fri Jul 9 21:17:57 2010 @@ -0,0 +1,396 @@ +package org.apache.bookkeeper.test; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.tools.BookKeeperTools; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * This class tests the bookie recovery admin functionality. + */ +public class BookieRecoveryTest extends BaseTestCase { + static Logger LOG = Logger.getLogger(BookieRecoveryTest.class); + + // Object used for synchronizing async method calls + class SyncObject { + boolean value; + + public SyncObject() { + value = false; + } + } + + // Object used for implementing the Bookie RecoverCallback for this jUnit + // test. This verifies that the operation completed successfully. + class BookieRecoverCallback implements RecoverCallback { + @Override + public void recoverComplete(int rc, Object ctx) { + LOG.info("Recovered bookie operation completed with rc: " + rc); + assertTrue(rc == Code.OK.intValue()); + SyncObject sync = (SyncObject) ctx; + synchronized (sync) { + sync.value = true; + sync.notify(); + } + } + } + + // Objects to use for this jUnit test. + DigestType digestType; + SyncObject sync; + BookieRecoverCallback bookieRecoverCb; + BookKeeperTools bkTools; + + // Constructor + public BookieRecoveryTest(DigestType digestType) { + super(3); + this.digestType = digestType; + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + // Set up the configuration properties needed. + System.setProperty("digestType", digestType.toString()); + System.setProperty("passwd", ""); + sync = new SyncObject(); + bookieRecoverCb = new BookieRecoverCallback(); + bkTools = new BookKeeperTools(HOSTPORT); + } + + @After + @Override + public void tearDown() throws Exception { + // Release any resources used by the BookKeeperTools instance. + bkTools.shutdown(); + super.tearDown(); + } + + /** + * Helper method to create a number of ledgers + * + * @param numLedgers + * Number of ledgers to create + * @return List of LedgerHandles for each of the ledgers created + * @throws BKException + * @throws KeeperException + * @throws IOException + * @throws InterruptedException + */ + private List createLedgers(int numLedgers) throws BKException, KeeperException, IOException, + InterruptedException { + List lhs = new ArrayList(); + for (int i = 0; i < numLedgers; i++) { + lhs.add(bkc.createLedger(digestType, System.getProperty("passwd").getBytes())); + } + return lhs; + } + + /** + * Helper method to write dummy ledger entries to all of the ledgers passed. + * + * @param numEntries + * Number of ledger entries to write for each ledger + * @param startEntryId + * The first entry Id we're expecting to write for each ledger + * @param lhs + * List of LedgerHandles for all ledgers to write entries to + * @throws BKException + * @throws InterruptedException + */ + private void writeEntriestoLedgers(int numEntries, long startEntryId, List lhs) throws BKException, + InterruptedException { + for (LedgerHandle lh : lhs) { + for (int i = 0; i < numEntries; i++) { + lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes()); + } + } + } + + /** + * Helper method to startup a new bookie server with the indicated port + * number + * + * @param port + * Port to start the new bookie server on + * @throws IOException + */ + private void startNewBookie(int port) throws IOException { + File f = File.createTempFile("bookie", "test"); + tmpDirs.add(f); + f.delete(); + f.mkdir(); + BookieServer server = new BookieServer(port, HOSTPORT, f, new File[] { f }); + server.start(); + bs.add(server); + LOG.info("New bookie on port " + port + " has been created."); + } + + /** + * Helper method to verify that we can read the recovered ledger entries. + * + * @param numLedgers + * Number of ledgers to verify + * @param startEntryId + * Start Entry Id to read + * @param endEntryId + * End Entry Id to read + * @throws BKException + * @throws InterruptedException + */ + private void verifyRecoveredLedgers(int numLedgers, long startEntryId, long endEntryId) throws BKException, + InterruptedException { + // Get a set of LedgerHandles for all of the ledgers to verify + List lhs = new ArrayList(); + for (int i = 0; i < numLedgers; i++) { + lhs.add(bkc.openLedger(i + 1, digestType, System.getProperty("passwd").getBytes())); + } + // Read the ledger entries to verify that they are all present and + // correct in the new bookie. + for (LedgerHandle lh : lhs) { + Enumeration entries = lh.readEntries(startEntryId, endEntryId); + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + assertTrue(new String(entry.getEntry()).equals("LedgerId: " + entry.getLedgerId() + ", EntryId: " + + entry.getEntryId())); + } + } + + } + + /** + * This tests the asynchronous bookie recovery functionality by writing + * entries into 3 bookies, killing one bookie, starting up a new one to + * replace it, and then recovering the ledger entries from the killed bookie + * onto the new one. We'll verify that the entries stored on the killed + * bookie are properly copied over and restored onto the new one. + * + * @throws Exception + */ + @Test + public void testAsyncBookieRecoveryToSpecificBookie() throws Exception { + // Create the ledgers + int numLedgers = 3; + List lhs = createLedgers(numLedgers); + + // Write the entries for the ledgers with dummy values. + int numMsgs = 10; + writeEntriestoLedgers(numMsgs, 0, lhs); + + // Shutdown the first bookie server + LOG.info("Finished writing all ledger entries so shutdown one of the bookies."); + bs.get(0).shutdown(); + bs.remove(0); + + // Startup a new bookie server + int newBookiePort = initialPort + numBookies; + startNewBookie(newBookiePort); + + // Write some more entries for the ledgers so a new ensemble will be + // created for them. + writeEntriestoLedgers(numMsgs, 10, lhs); + + // Call the async recover bookie method. + InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); + InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort); + LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one (" + + bookieDest + ")"); + // Initiate the sync object + sync.value = false; + bkTools.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync); + + // Wait for the async method to complete. + synchronized (sync) { + while (sync.value == false) { + sync.wait(); + } + } + + // Verify the recovered ledger entries are okay. + verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1); + } + + /** + * This tests the asynchronous bookie recovery functionality by writing + * entries into 3 bookies, killing one bookie, starting up a few new + * bookies, and then recovering the ledger entries from the killed bookie + * onto random available bookie servers. We'll verify that the entries + * stored on the killed bookie are properly copied over and restored onto + * the other bookies. + * + * @throws Exception + */ + @Test + public void testAsyncBookieRecoveryToRandomBookies() throws Exception { + // Create the ledgers + int numLedgers = 3; + List lhs = createLedgers(numLedgers); + + // Write the entries for the ledgers with dummy values. + int numMsgs = 10; + writeEntriestoLedgers(numMsgs, 0, lhs); + + // Shutdown the first bookie server + LOG.info("Finished writing all ledger entries so shutdown one of the bookies."); + bs.get(0).shutdown(); + bs.remove(0); + + // Startup three new bookie servers + for (int i = 0; i < 3; i++) { + int newBookiePort = initialPort + numBookies + i; + startNewBookie(newBookiePort); + } + + // Write some more entries for the ledgers so a new ensemble will be + // created for them. + writeEntriestoLedgers(numMsgs, 10, lhs); + + // Call the async recover bookie method. + InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); + InetSocketAddress bookieDest = null; + LOG.info("Now recover the data on the killed bookie (" + bookieSrc + + ") and replicate it to a random available one"); + // Initiate the sync object + sync.value = false; + bkTools.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync); + + // Wait for the async method to complete. + synchronized (sync) { + while (sync.value == false) { + sync.wait(); + } + } + + // Verify the recovered ledger entries are okay. + verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1); + } + + /** + * This tests the synchronous bookie recovery functionality by writing + * entries into 3 bookies, killing one bookie, starting up a new one to + * replace it, and then recovering the ledger entries from the killed bookie + * onto the new one. We'll verify that the entries stored on the killed + * bookie are properly copied over and restored onto the new one. + * + * @throws Exception + */ + @Test + public void testSyncBookieRecoveryToSpecificBookie() throws Exception { + // Create the ledgers + int numLedgers = 3; + List lhs = createLedgers(numLedgers); + + // Write the entries for the ledgers with dummy values. + int numMsgs = 10; + writeEntriestoLedgers(numMsgs, 0, lhs); + + // Shutdown the first bookie server + LOG.info("Finished writing all ledger entries so shutdown one of the bookies."); + bs.get(0).shutdown(); + bs.remove(0); + + // Startup a new bookie server + int newBookiePort = initialPort + numBookies; + startNewBookie(newBookiePort); + + // Write some more entries for the ledgers so a new ensemble will be + // created for them. + writeEntriestoLedgers(numMsgs, 10, lhs); + + // Call the sync recover bookie method. + InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); + InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort); + LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one (" + + bookieDest + ")"); + bkTools.recoverBookieData(bookieSrc, bookieDest); + + // Verify the recovered ledger entries are okay. + verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1); + } + + /** + * This tests the synchronous bookie recovery functionality by writing + * entries into 3 bookies, killing one bookie, starting up a few new + * bookies, and then recovering the ledger entries from the killed bookie + * onto random available bookie servers. We'll verify that the entries + * stored on the killed bookie are properly copied over and restored onto + * the other bookies. + * + * @throws Exception + */ + @Test + public void testSyncBookieRecoveryToRandomBookies() throws Exception { + // Create the ledgers + int numLedgers = 3; + List lhs = createLedgers(numLedgers); + + // Write the entries for the ledgers with dummy values. + int numMsgs = 10; + writeEntriestoLedgers(numMsgs, 0, lhs); + + // Shutdown the first bookie server + LOG.info("Finished writing all ledger entries so shutdown one of the bookies."); + bs.get(0).shutdown(); + bs.remove(0); + + // Startup three new bookie servers + for (int i = 0; i < 3; i++) { + int newBookiePort = initialPort + numBookies + i; + startNewBookie(newBookiePort); + } + + // Write some more entries for the ledgers so a new ensemble will be + // created for them. + writeEntriestoLedgers(numMsgs, 10, lhs); + + // Call the sync recover bookie method. + InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); + InetSocketAddress bookieDest = null; + LOG.info("Now recover the data on the killed bookie (" + bookieSrc + + ") and replicate it to a random available one"); + bkTools.recoverBookieData(bookieSrc, bookieDest); + + // Verify the recovered ledger entries are okay. + verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1); + } + +}