zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1428258 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Date Thu, 03 Jan 2013 10:06:24 GMT
Author: ivank
Date: Thu Jan  3 10:06:24 2013
New Revision: 1428258

URL: http://svn.apache.org/viewvc?rev=1428258&view=rev
Log:
BOOKKEEPER-409: Integration Test - Perform bookie rereplication cycle by Auditor-RW processes
(rakeshr via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1428258&r1=1428257&r2=1428258&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Jan  3 10:06:24 2013
@@ -281,6 +281,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-463: Refactor garbage collection code for ease to plugin different GC
algorithm. (Fangmin, ivank, fpj via sijie)
 
+        BOOKKEEPER-409: Integration Test - Perform bookie rereplication cycle by Auditor-RW
processes (rakeshr via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations
in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1428258&r1=1428257&r2=1428258&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
Thu Jan  3 10:06:24 2013
@@ -289,13 +289,15 @@ public class ReplicationWorker implement
             }
             workerRunning = false;
         }
+        this.pendingReplicationTimer.cancel();
         try {
-            underreplicationManager.close();
-        } catch (UnavailableException e) {
-            LOG.warn("Exception while closing the "
-                    + "ZkLedgerUnderrepliationManager", e);
+            this.workerThread.interrupt();
+            this.workerThread.join();
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted during shutting down replication worker : ",
+                    e);
+            Thread.currentThread().interrupt();
         }
-        this.pendingReplicationTimer.cancel();
         try {
             bkc.close();
         } catch (InterruptedException e) {
@@ -305,12 +307,10 @@ public class ReplicationWorker implement
             LOG.warn("Exception while closing the Bookie client", e);
         }
         try {
-            this.workerThread.interrupt();
-            this.workerThread.join();
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted during shutting down replication worker : ",
-                    e);
-            Thread.currentThread().interrupt();
+            underreplicationManager.close();
+        } catch (UnavailableException e) {
+            LOG.warn("Exception while closing the "
+                    + "ZkLedgerUnderrepliationManager", e);
         }
     }
 

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java?rev=1428258&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
Thu Jan  3 10:06:24 2013
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandleAdapter;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests verifies the complete functionality of the
+ * Auditor-rereplication process: Auditor will publish the bookie failures,
+ * consequently ReplicationWorker will get the notifications and act on it.
+ */
+public class BookieAutoRecoveryTest extends
+        MultiLedgerManagerTestCase {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(BookieAutoRecoveryTest.class);
+    private static final byte[] PASSWD = "admin".getBytes();
+    private static final byte[] data = "TESTDATA".getBytes();
+    private static final String openLedgerRereplicationGracePeriod = "3000"; // milliseconds
+
+    private DigestType digestType;
+    private LedgerManagerFactory mFactory;
+    private LedgerUnderreplicationManager underReplicationManager;
+    private LedgerManager ledgerManager;
+
+    private final String UNDERREPLICATED_PATH = baseClientConf
+            .getZkLedgersRootPath() + "/underreplication/ledgers";
+
+    public BookieAutoRecoveryTest(String ledgerManagerFactory) throws IOException, KeeperException,
+            InterruptedException, UnavailableException, CompatibilityException {
+        super(3);
+        LOG.info("Running test case using ledger manager : "
+                + ledgerManagerFactory);
+        // set ledger manager name
+        baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+        baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
+        baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+        this.digestType = DigestType.MAC;
+        setAutoRecoveryEnabled(true);
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        // initialize urReplicationManager
+        mFactory = LedgerManagerFactory.newLedgerManagerFactory(baseClientConf,
+                zkc);
+        underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+        LedgerManagerFactory newLedgerManagerFactory = LedgerManagerFactory
+                .newLedgerManagerFactory(baseClientConf, zkc);
+        ledgerManager = newLedgerManagerFactory.newLedgerManager();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (null != mFactory) {
+            mFactory.uninitialize();
+            mFactory = null;
+        }
+        if (null != underReplicationManager) {
+            underReplicationManager.close();
+            underReplicationManager = null;
+        }
+        if (null != ledgerManager) {
+            ledgerManager.close();
+            ledgerManager = null;
+        }
+    }
+
+    /**
+     * Test verifies publish urLedger by Auditor and replication worker is
+     * picking up the entries and finishing the rereplication of open ledger
+     */
+    @Test(timeout = 90000)
+    public void testOpenLedgers() throws Exception {
+        List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
+        LedgerHandle lh = listOfLedgerHandle.get(0);
+        int ledgerReplicaIndex = 0;
+        InetSocketAddress replicaToKillAddr = LedgerHandleAdapter
+                .getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
+
+        final String urLedgerZNode = getUrLedgerZNode(lh);
+        ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        assertNull("UrLedger already exists!",
+                watchUrLedgerNode(urLedgerZNode, latch));
+
+        LOG.info("Killing Bookie :" + replicaToKillAddr);
+        killBookie(replicaToKillAddr);
+
+        // waiting to publish urLedger znode by Auditor
+        latch.await();
+        latch = new CountDownLatch(1);
+        LOG.info("Watching on urLedgerPath:" + urLedgerZNode
+                + " to know the status of rereplication process");
+        assertNotNull("UrLedger doesn't exists!",
+                watchUrLedgerNode(urLedgerZNode, latch));
+
+        // starting the replication service, so that he will be able to act as
+        // target bookie
+        startNewBookie();
+        int newBookieIndex = bs.size() - 1;
+        BookieServer newBookieServer = bs.get(newBookieIndex);
+
+        LOG.debug("Waiting to finish the replication of failed bookie : "
+                + replicaToKillAddr);
+        latch.await();
+
+        // grace period to update the urledger metadata in zookeeper
+        LOG.info("Waiting to update the urledger metadata in zookeeper");
+
+        verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
+                listOfLedgerHandle.get(0), ledgerReplicaIndex);
+    }
+
+    /**
+     * Test verifies publish urLedger by Auditor and replication worker is
+     * picking up the entries and finishing the rereplication of closed ledgers
+     */
+    @Test(timeout = 90000)
+    public void testClosedLedgers() throws Exception {
+        List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
+        List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
+        closeLedgers(listOfLedgerHandle);
+        LedgerHandle lhandle = listOfLedgerHandle.get(0);
+        int ledgerReplicaIndex = 0;
+        InetSocketAddress replicaToKillAddr = LedgerHandleAdapter
+                .getLedgerMetadata(lhandle).getEnsembles().get(0L).get(0);
+
+        CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
+        for (LedgerHandle lh : listOfLedgerHandle) {
+            ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr);
+            listOfReplicaIndex.add(ledgerReplicaIndex);
+            assertNull("UrLedger already exists!",
+                    watchUrLedgerNode(getUrLedgerZNode(lh), latch));
+        }
+
+        LOG.info("Killing Bookie :" + replicaToKillAddr);
+        killBookie(replicaToKillAddr);
+
+        // waiting to publish urLedger znode by Auditor
+        latch.await();
+
+        // Again watching the urLedger znode to know the replication status
+        latch = new CountDownLatch(listOfLedgerHandle.size());
+        for (LedgerHandle lh : listOfLedgerHandle) {
+            String urLedgerZNode = getUrLedgerZNode(lh);
+            LOG.info("Watching on urLedgerPath:" + urLedgerZNode
+                    + " to know the status of rereplication process");
+            assertNotNull("UrLedger doesn't exists!",
+                    watchUrLedgerNode(urLedgerZNode, latch));
+        }
+
+        // starting the replication service, so that he will be able to act as
+        // target bookie
+        startNewBookie();
+        int newBookieIndex = bs.size() - 1;
+        BookieServer newBookieServer = bs.get(newBookieIndex);
+
+        LOG.debug("Waiting to finish the replication of failed bookie : "
+                + replicaToKillAddr);
+
+        // waiting to finish replication
+        latch.await();
+
+        // grace period to update the urledger metadata in zookeeper
+        LOG.info("Waiting to update the urledger metadata in zookeeper");
+
+        for (int index = 0; index < listOfLedgerHandle.size(); index++) {
+            verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
+                    listOfLedgerHandle.get(index),
+                    listOfReplicaIndex.get(index));
+        }
+    }
+
+    /**
+     * Test stopping replica service while replication in progress. Considering
+     * when there is an exception will shutdown Auditor and RW processes. After
+     * restarting should be able to finish the re-replication activities
+     */
+    @Test(timeout = 90000)
+    public void testStopWhileReplicationInProgress() throws Exception {
+        int numberOfLedgers = 2;
+        List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
+        List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(
+                numberOfLedgers, 5);
+        closeLedgers(listOfLedgerHandle);
+        LedgerHandle handle = listOfLedgerHandle.get(0);
+        InetSocketAddress replicaToKillAddr = LedgerHandleAdapter
+                .getLedgerMetadata(handle).getEnsembles().get(0L).get(0);
+        LOG.info("Killing Bookie:" + replicaToKillAddr);
+
+        // Each ledger, there will be two events : create urLedger and after
+        // rereplication delete urLedger
+        CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
+        for (int i = 0; i < listOfLedgerHandle.size(); i++) {
+            final String urLedgerZNode = getUrLedgerZNode(listOfLedgerHandle
+                    .get(i));
+            assertNull("UrLedger already exists!",
+                    watchUrLedgerNode(urLedgerZNode, latch));
+            int replicaIndexInLedger = getReplicaIndexInLedger(
+                    listOfLedgerHandle.get(i), replicaToKillAddr);
+            listOfReplicaIndex.add(replicaIndexInLedger);
+        }
+
+        LOG.info("Killing Bookie :" + replicaToKillAddr);
+        killBookie(replicaToKillAddr);
+
+        // waiting to publish urLedger znode by Auditor
+        latch.await();
+
+        // Again watching the urLedger znode to know the replication status
+        latch = new CountDownLatch(listOfLedgerHandle.size());
+        for (LedgerHandle lh : listOfLedgerHandle) {
+            String urLedgerZNode = getUrLedgerZNode(lh);
+            LOG.info("Watching on urLedgerPath:" + urLedgerZNode
+                    + " to know the status of rereplication process");
+            assertNotNull("UrLedger doesn't exists!",
+                    watchUrLedgerNode(urLedgerZNode, latch));
+        }
+
+        // starting the replication service, so that he will be able to act as
+        // target bookie
+        startNewBookie();
+        int newBookieIndex = bs.size() - 1;
+        BookieServer newBookieServer = bs.get(newBookieIndex);
+
+        LOG.debug("Waiting to finish the replication of failed bookie : "
+                + replicaToKillAddr);
+        while (true) {
+            if (latch.getCount() < numberOfLedgers || latch.getCount() <= 0) {
+                stopReplicationService();
+                LOG.info("Latch Count is:" + latch.getCount());
+                break;
+            }
+            // grace period to take breath
+            Thread.sleep(1000);
+        }
+
+        startReplicationService();
+
+        LOG.info("Waiting to finish rereplication processes");
+        latch.await();
+
+        // grace period to update the urledger metadata in zookeeper
+        LOG.info("Waiting to update the urledger metadata in zookeeper");
+
+        for (int index = 0; index < listOfLedgerHandle.size(); index++) {
+            verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
+                    listOfLedgerHandle.get(index),
+                    listOfReplicaIndex.get(index));
+        }
+    }
+
+    /**
+     * Verify the published urledgers of deleted ledgers(those ledgers where
+     * deleted after publishing as urledgers by Auditor) should be cleared off
+     * by the newly selected replica bookie
+     */
+    @Test(timeout = 30000)
+    public void testNoSuchLedgerExists() throws Exception {
+        List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(2, 5);
+        CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
+        for (LedgerHandle lh : listOfLedgerHandle) {
+            assertNull("UrLedger already exists!",
+                    watchUrLedgerNode(getUrLedgerZNode(lh), latch));
+        }
+        InetSocketAddress replicaToKillAddr = LedgerHandleAdapter
+                .getLedgerMetadata(listOfLedgerHandle.get(0)).getEnsembles()
+                .get(0L).get(0);
+        killBookie(replicaToKillAddr);
+        replicaToKillAddr = LedgerHandleAdapter
+                .getLedgerMetadata(listOfLedgerHandle.get(0)).getEnsembles()
+                .get(0L).get(0);
+        killBookie(replicaToKillAddr);
+        // waiting to publish urLedger znode by Auditor
+        latch.await();
+
+        latch = new CountDownLatch(listOfLedgerHandle.size());
+        for (LedgerHandle lh : listOfLedgerHandle) {
+            assertNotNull("UrLedger doesn't exists!",
+                    watchUrLedgerNode(getUrLedgerZNode(lh), latch));
+        }
+
+        // delete ledgers
+        for (LedgerHandle lh : listOfLedgerHandle) {
+            bkc.deleteLedger(lh.getId());
+        }
+        startNewBookie();
+
+        // waiting to delete published urledgers, since it doesn't exists
+        latch.await();
+
+        for (LedgerHandle lh : listOfLedgerHandle) {
+            assertNull("UrLedger still exists after rereplication",
+                    watchUrLedgerNode(getUrLedgerZNode(lh), latch));
+        }
+    }
+
+    private int getReplicaIndexInLedger(LedgerHandle lh,
+            InetSocketAddress replicaToKill) {
+        SortedMap<Long, ArrayList<InetSocketAddress>> ensembles = LedgerHandleAdapter
+                .getLedgerMetadata(lh).getEnsembles();
+        int ledgerReplicaIndex = -1;
+        for (InetSocketAddress addr : ensembles.get(0L)) {
+            ++ledgerReplicaIndex;
+            if (addr.equals(replicaToKill)) {
+                break;
+            }
+        }
+        return ledgerReplicaIndex;
+    }
+
+    private void verifyLedgerEnsembleMetadataAfterReplication(
+            BookieServer newBookieServer, LedgerHandle lh,
+            int ledgerReplicaIndex) throws BKException, InterruptedException {
+        LedgerHandle openLedger = bkc
+                .openLedger(lh.getId(), digestType, PASSWD);
+
+        InetSocketAddress inetSocketAddress = LedgerHandleAdapter
+                .getLedgerMetadata(openLedger).getEnsembles().get(0L)
+                .get(ledgerReplicaIndex);
+        assertEquals("Rereplication has been failed and ledgerReplicaIndex :"
+                + ledgerReplicaIndex, newBookieServer.getLocalAddress(),
+                inetSocketAddress);
+    }
+
+    private void closeLedgers(List<LedgerHandle> listOfLedgerHandle)
+            throws InterruptedException, BKException {
+        for (LedgerHandle lh : listOfLedgerHandle) {
+            lh.close();
+        }
+    }
+
+    private List<LedgerHandle> createLedgersAndAddEntries(int numberOfLedgers,
+            int numberOfEntries) throws InterruptedException, BKException {
+        List<LedgerHandle> listOfLedgerHandle = new ArrayList<LedgerHandle>(
+                numberOfLedgers);
+        for (int index = 0; index < numberOfLedgers; index++) {
+            LedgerHandle lh = bkc.createLedger(3, 3, digestType, PASSWD);
+            listOfLedgerHandle.add(lh);
+            for (int i = 0; i < numberOfEntries; i++) {
+                lh.addEntry(data);
+            }
+        }
+        return listOfLedgerHandle;
+    }
+
+    private String getUrLedgerZNode(LedgerHandle lh) {
+        return ZkLedgerUnderreplicationManager.getUrLedgerZnode(
+                UNDERREPLICATED_PATH, lh.getId());
+    }
+
+    private Stat watchUrLedgerNode(final String znode,
+            final CountDownLatch latch) throws KeeperException,
+            InterruptedException {
+        return zkc.exists(znode, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType() == EventType.NodeDeleted) {
+                    LOG.info("Recieved Ledger rereplication completion event :"
+                            + event.getType());
+                    latch.countDown();
+                }
+                if (event.getType() == EventType.NodeCreated) {
+                    LOG.info("Recieved urLedger publishing event :"
+                            + event.getType());
+                    latch.countDown();
+                }
+            }
+        });
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1428258&r1=1428257&r2=1428258&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
Thu Jan  3 10:06:24 2013
@@ -21,33 +21,38 @@
 
 package org.apache.bookkeeper.test;
 
-import java.io.IOException;
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.bookkeeper.bookie.BookieException;
+import junit.framework.TestCase;
+
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.metastore.InMemoryMetaStore;
 import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.AutoRecoveryMain;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Before;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import junit.framework.TestCase;
-
 /**
  * A class runs several bookie servers for testing.
  */
@@ -69,6 +74,10 @@ public abstract class BookKeeperClusterT
     protected ServerConfiguration baseConf = new ServerConfiguration();
     protected ClientConfiguration baseClientConf = new ClientConfiguration();
 
+    private Map<BookieServer, AutoRecoveryMain> autoRecoveryProcesses = new HashMap<BookieServer,
AutoRecoveryMain>();
+
+    private boolean isAutoRecoveryEnabled;
+
     public BookKeeperClusterTestCase(int numBookies) {
         this.numBookies = numBookies;
     }
@@ -123,7 +132,8 @@ public abstract class BookKeeperClusterT
     }
 
     /**
-     * Start cluster
+     * Start cluster. Also, starts the auto recovery process for each bookie, if
+     * isAutoRecoveryEnabled is true.
      *
      * @throws Exception
      */
@@ -140,7 +150,8 @@ public abstract class BookKeeperClusterT
     }
 
     /**
-     * Stop cluster
+     * Stop cluster. Also, stops all the auto recovery processes for the bookie
+     * cluster, if isAutoRecoveryEnabled is true.
      *
      * @throws Exception
      */
@@ -151,6 +162,12 @@ public abstract class BookKeeperClusterT
 
         for (BookieServer server : bs) {
             server.shutdown();
+            AutoRecoveryMain autoRecovery = autoRecoveryProcesses.get(server);
+            if (autoRecovery != null && isAutoRecoveryEnabled()) {
+                autoRecovery.shutdown();
+                LOG.debug("Shutdown auto recovery for bookieserver:"
+                        + server.getLocalAddress());
+            }
         }
         bs.clear();
         for (File f : tmpDirs) {
@@ -183,10 +200,11 @@ public abstract class BookKeeperClusterT
     }
 
     /**
-     * Kill a bookie by its socket address
+     * Kill a bookie by its socket address. Also, stops the autorecovery process
+     * for the corresponding bookie server, if isAutoRecoveryEnabled is true.
      *
      * @param addr
-     *          Socket Address
+     *            Socket Address
      * @return the configuration of killed bookie
      * @throws InterruptedException
      */
@@ -202,6 +220,7 @@ public abstract class BookKeeperClusterT
             ++toRemoveIndex;
         }
         if (toRemove != null) {
+            stopAutoRecoveryService(toRemove);
             bs.remove(toRemove);
             return bsConfs.remove(toRemoveIndex);
         }
@@ -209,10 +228,11 @@ public abstract class BookKeeperClusterT
     }
 
     /**
-     * Kill a bookie by index
+     * Kill a bookie by index. Also, stops the respective auto recovery process
+     * for this bookie, if isAutoRecoveryEnabled is true.
      *
      * @param index
-     *          Bookie Index
+     *            Bookie Index
      * @return the configuration of killed bookie
      * @throws InterruptedException
      * @throws IOException
@@ -223,6 +243,7 @@ public abstract class BookKeeperClusterT
         }
         BookieServer server = bs.get(index);
         server.shutdown();
+        stopAutoRecoveryService(server);
         bs.remove(server);
         return bsConfs.remove(index);
     }
@@ -302,7 +323,8 @@ public abstract class BookKeeperClusterT
     }
 
     /**
-     * Restart bookie servers
+     * Restart bookie servers. Also restarts all the respective auto recovery
+     * process, if isAutoRecoveryEnabled is true.
      *
      * @throws InterruptedException
      * @throws IOException
@@ -315,10 +337,11 @@ public abstract class BookKeeperClusterT
     }
 
     /**
-     * Restart bookie servers using new configuration settings
+     * Restart bookie servers using new configuration settings. Also restart the
+     * respective auto recovery process, if isAutoRecoveryEnabled is true.
      *
      * @param newConf
-     *          New Configuration Settings
+     *            New Configuration Settings
      * @throws InterruptedException
      * @throws IOException
      * @throws KeeperException
@@ -329,6 +352,7 @@ public abstract class BookKeeperClusterT
         // shut down bookie server
         for (BookieServer server : bs) {
             server.shutdown();
+            stopAutoRecoveryService(server);
         }
         bs.clear();
         Thread.sleep(1000);
@@ -345,7 +369,8 @@ public abstract class BookKeeperClusterT
 
     /**
      * Helper method to startup a new bookie server with the indicated port
-     * number
+     * number. Also, starts the auto recovery process, if the
+     * isAutoRecoveryEnabled is set true.
      *
      * @param port
      *            Port to start the new bookie server on
@@ -368,14 +393,16 @@ public abstract class BookKeeperClusterT
     }
 
     /**
-     * Helper method to startup a bookie server using a configuration object
+     * Helper method to startup a bookie server using a configuration object.
+     * Also, starts the auto recovery process if isAutoRecoveryEnabled is true.
      *
      * @param conf
      *            Server Configuration Object
      *
      */
     protected BookieServer startBookie(ServerConfiguration conf)
-            throws IOException, InterruptedException, KeeperException, BookieException {
+            throws IOException, InterruptedException, KeeperException,
+            BookieException {
         BookieServer server = new BookieServer(conf);
         server.start();
 
@@ -387,11 +414,19 @@ public abstract class BookKeeperClusterT
         bkc.readBookiesBlocking();
         LOG.info("New bookie on port " + port + " has been created.");
 
+        try {
+            startAutoRecovery(server, conf);
+        } catch (CompatibilityException ce) {
+            LOG.error("Exception while starting AutoRecovery!", ce);
+        } catch (UnavailableException ue) {
+            LOG.error("Exception while starting AutoRecovery!", ue);
+        }
         return server;
     }
 
     /**
-     * Start a bookie with the given bookie instance.
+     * Start a bookie with the given bookie instance. Also, starts the auto
+     * recovery for this bookie, if isAutoRecoveryEnabled is true.
      */
     protected BookieServer startBookie(ServerConfiguration conf, final Bookie b)
             throws IOException, InterruptedException, KeeperException, BookieException {
@@ -410,11 +445,105 @@ public abstract class BookKeeperClusterT
 
         bkc.readBookiesBlocking();
         LOG.info("New bookie on port " + port + " has been created.");
-
+        try {
+            startAutoRecovery(server, conf);
+        } catch (CompatibilityException ce) {
+            LOG.error("Exception while starting AutoRecovery!", ce);
+        } catch (UnavailableException ue) {
+            LOG.error("Exception while starting AutoRecovery!", ue);
+        }
         return server;
     }
 
     public void setMetastoreImplClass(AbstractConfiguration conf) {
         conf.setMetastoreImplClass(InMemoryMetaStore.class.getName());
     }
+
+    /**
+     * Flags used to enable/disable the auto recovery process. If it is enabled,
+     * starting the bookie server will starts the auto recovery process for that
+     * bookie. Also, stopping bookie will stops the respective auto recovery
+     * process.
+     *
+     * @param isAutoRecoveryEnabled
+     *            Value true will enable the auto recovery process. Value false
+     *            will disable the auto recovery process
+     */
+    public void setAutoRecoveryEnabled(boolean isAutoRecoveryEnabled) {
+        this.isAutoRecoveryEnabled = isAutoRecoveryEnabled;
+    }
+
+    /**
+     * Flag used to check whether auto recovery process is enabled/disabled. By
+     * default the flag is false.
+     *
+     * @return true, if the auto recovery is enabled. Otherwise return false.
+     */
+    public boolean isAutoRecoveryEnabled() {
+        return isAutoRecoveryEnabled;
+    }
+
+    private void startAutoRecovery(BookieServer bserver,
+            ServerConfiguration conf) throws CompatibilityException,
+            KeeperException, InterruptedException, IOException,
+            UnavailableException {
+        if (isAutoRecoveryEnabled()) {
+            AutoRecoveryMain autoRecoveryProcess = new AutoRecoveryMain(conf);
+            autoRecoveryProcess.start();
+            autoRecoveryProcesses.put(bserver, autoRecoveryProcess);
+            LOG.debug("Starting Auditor Recovery for the bookie:"
+                    + bserver.getLocalAddress());
+        }
+    }
+
+    private void stopAutoRecoveryService(BookieServer toRemove) {
+        AutoRecoveryMain autoRecoveryMain = autoRecoveryProcesses
+                .remove(toRemove);
+        if (null != autoRecoveryMain && isAutoRecoveryEnabled()) {
+            autoRecoveryMain.shutdown();
+            LOG.debug("Shutdown auto recovery for bookieserver:"
+                    + toRemove.getLocalAddress());
+        }
+    }
+
+    /**
+     * Will starts the auto recovery process for the bookie servers. One auto
+     * recovery process per each bookie server, if isAutoRecoveryEnabled is
+     * enabled.
+     *
+     * @throws CompatibilityException
+     *             - Compatibility error
+     * @throws KeeperException
+     *             - ZK exception
+     * @throws InterruptedException
+     *             - interrupted exception
+     * @throws IOException
+     *             - IOException
+     * @throws UnavailableException
+     *             - replication service has become unavailable
+     */
+    public void startReplicationService() throws CompatibilityException,
+            KeeperException, InterruptedException, IOException,
+            UnavailableException {
+        int index = -1;
+        for (BookieServer bserver : bs) {
+            startAutoRecovery(bserver, bsConfs.get(++index));
+        }
+    }
+
+    /**
+     * Will stops all the auto recovery processes for the bookie cluster, if
+     * isAutoRecoveryEnabled is true.
+     */
+    public void stopReplicationService() {
+        if(false == isAutoRecoveryEnabled()){
+            return;
+        }
+        for (Entry<BookieServer, AutoRecoveryMain> autoRecoveryProcess : autoRecoveryProcesses
+                .entrySet()) {
+            autoRecoveryProcess.getValue().shutdown();
+            LOG.debug("Shutdown Auditor Recovery for the bookie:"
+                    + autoRecoveryProcess.getKey().getLocalAddress());
+        }
+    }
 }



Mime
View raw message