Author: ivank
Date: Mon Aug 27 14:55:17 2012
New Revision: 1377703
URL: http://svn.apache.org/viewvc?rev=1377703&view=rev
Log:
BOOKKEEPER-304: Prepare bookie vs ledgers cache and will be used by the Auditor (rakeshr via
ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1377703&r1=1377702&r2=1377703&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Aug 27 14:55:17 2012
@@ -102,6 +102,8 @@ Trunk (unreleased changes)
BOOKKEEPER-248: Rereplicating of under replicated data (umamahesh via ivank)
+ BOOKKEEPER-304: Prepare bookie vs ledgers cache and will be used by the Auditor (rakeshr
via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations
in Hedwig (sijie via ivank)
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java?rev=1377703&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
Mon Aug 27 14:55:17 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Preparing bookie vs its corresponding ledgers. This will always look up the
+ * ledgermanager for ledger metadata and will generate indexes.
+ */
+public class BookieLedgerIndexer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BookieLedgerIndexer.class);
+ private final LedgerManager ledgerManager;
+
+ public BookieLedgerIndexer(LedgerManager ledgerManager) {
+ this.ledgerManager = ledgerManager;
+ }
+
+ /**
+ * Generating bookie vs its ledgers map by reading all the ledgers in each
+ * bookie and parsing its metadata.
+ *
+ * @return bookie2ledgersMap map of bookie vs ledgers
+ * @throws BKAuditException
+ * exception while getting bookie-ledgers
+ */
+ public Map<String, Set<Long>> getBookieToLedgerIndex()
+ throws BKAuditException {
+ // bookie vs ledgers map
+ final ConcurrentHashMap<String, Set<Long>> bookie2ledgersMap
+ = new ConcurrentHashMap<String, Set<Long>>();
+ final CountDownLatch ledgerCollectorLatch = new CountDownLatch(1);
+
+ Processor<Long> ledgerProcessor = new Processor<Long>() {
+ @Override
+ public void process(final Long ledgerId,
+ final AsyncCallback.VoidCallback iterCallback) {
+ GenericCallback<LedgerMetadata> genericCallback = new GenericCallback<LedgerMetadata>()
{
+ @Override
+ public void operationComplete(final int rc,
+ LedgerMetadata ledgerMetadata) {
+ if (rc == BKException.Code.OK) {
+ StringBuilder bookieAddr;
+ for (Map.Entry<Long, ArrayList<InetSocketAddress>>
ensemble : ledgerMetadata
+ .getEnsembles().entrySet()) {
+ for (InetSocketAddress bookie : ensemble
+ .getValue()) {
+ bookieAddr = new StringBuilder();
+ StringUtils
+ .addrToString(bookieAddr, bookie);
+ putLedger(bookie2ledgersMap, bookieAddr
+ .toString(), ledgerId);
+ }
+ }
+ } else {
+ LOG.warn("Unable to read the ledger:" + ledgerId
+ + " information");
+ }
+ iterCallback.processResult(rc, null, null);
+ }
+ };
+ ledgerManager.readLedgerMetadata(ledgerId, genericCallback);
+ }
+ };
+ // Reading the result after processing all the ledgers
+ final List<Integer> resultCode = new ArrayList<Integer>(1);
+ ledgerManager.asyncProcessLedgers(ledgerProcessor,
+ new AsyncCallback.VoidCallback() {
+
+ @Override
+ public void processResult(int rc, String s, Object obj) {
+ resultCode.add(rc);
+ ledgerCollectorLatch.countDown();
+ }
+ }, null, BKException.Code.OK, BKException.Code.ReadException);
+ try {
+ ledgerCollectorLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new BKAuditException(
+ "Exception while getting the bookie-ledgers", e);
+ }
+ if (!resultCode.contains(BKException.Code.OK)) {
+ throw new BKAuditException(
+ "Exception while getting the bookie-ledgers", BKException
+ .create(resultCode.get(0)));
+ }
+ return bookie2ledgersMap;
+ }
+
+ private void putLedger(ConcurrentHashMap<String, Set<Long>> bookie2ledgersMap,
+ String bookie, long ledgerId) {
+ Set<Long> ledgers = bookie2ledgersMap.get(bookie);
+ // creates an empty list and add to bookie for keeping its ledgers
+ if (ledgers == null) {
+ ledgers = Collections.synchronizedSet(new HashSet<Long>());
+ Set<Long> oldLedgers = bookie2ledgersMap.putIfAbsent(bookie, ledgers);
+ if (oldLedgers != null) {
+ ledgers = oldLedgers;
+ }
+ }
+ ledgers.add(ledgerId);
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java?rev=1377703&r1=1377702&r2=1377703&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
Mon Aug 27 14:55:17 2012
@@ -60,4 +60,15 @@ public abstract class ReplicationExcepti
super(message);
}
}
+
+ /**
+ * Exception while auditing bookie-ledgers
+ */
+ static class BKAuditException extends ReplicationException {
+ private static final long serialVersionUID = 95551905L;
+
+ BKAuditException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java?rev=1377703&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
Mon Aug 27 14:55:17 2012
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
+import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests verifies bookie vs ledger mapping generating by the BookieLedgerIndexer
+ */
+public class BookieLedgerIndexTest extends MultiLedgerManagerTestCase {
+
+ // Depending on the taste, select the amount of logging
+ // by decommenting one of the two lines below
+ // static Logger LOG = Logger.getRootLogger();
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BookieLedgerIndexTest.class);
+
+ private byte[] ledgerPassword = "admin".getBytes();
+ private Random rng; // Random Number Generator
+ private ArrayList<byte[]> entries; // generated entries
+ private DigestType digestType;
+
+ public BookieLedgerIndexTest(String ledgerManagerFactory) {
+ super(3);
+ LOG.info("Running test case using ledger manager : "
+ + ledgerManagerFactory);
+ this.digestType = DigestType.CRC32;
+ // set ledger manager name
+ baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+ baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ rng = new Random(System.currentTimeMillis()); // Initialize the Random
+ // Number Generator
+ entries = new ArrayList<byte[]>(); // initialize the entries list
+ }
+
+ /**
+ * Verify the bookie-ledger mapping with minimum number of bookies and few
+ * ledgers
+ */
+ @Test
+ public void testSimpleBookieLedgerMapping() throws Exception {
+ LedgerManagerFactory newLedgerManagerFactory = LedgerManagerFactory
+ .newLedgerManagerFactory(baseConf, zkc);
+ LedgerManager ledgerManager = newLedgerManagerFactory
+ .newLedgerManager();
+
+ List<Long> ledgerList = new ArrayList<Long>(3);
+ LedgerHandle lh = createAndAddEntriesToLedger();
+ lh.close();
+ ledgerList.add(lh.getId());
+
+ lh = createAndAddEntriesToLedger();
+ lh.close();
+ ledgerList.add(lh.getId());
+
+ lh = createAndAddEntriesToLedger();
+ lh.close();
+ ledgerList.add(lh.getId());
+
+ BookieLedgerIndexer bookieLedgerIndex = new BookieLedgerIndexer(
+ ledgerManager);
+
+ Map<String, Set<Long>> bookieToLedgerIndex = bookieLedgerIndex
+ .getBookieToLedgerIndex();
+
+ assertEquals("Missed few bookies in the bookie-ledger mapping!", 3,
+ bookieToLedgerIndex.size());
+ Collection<Set<Long>> bk2ledgerEntry = bookieToLedgerIndex.values();
+ for (Set<Long> ledgers : bk2ledgerEntry) {
+ assertEquals("Missed few ledgers in the bookie-ledger mapping!", 3,
+ ledgers.size());
+ for (Long ledgerId : ledgers) {
+ assertTrue("Unknown ledger-bookie mapping", ledgerList
+ .contains(ledgerId));
+ }
+ }
+ }
+
+ /**
+ * Verify ledger index with failed bookies and throws exception
+ */
+ @Test
+ public void testWithoutZookeeper() throws Exception {
+ LedgerManagerFactory newLedgerManagerFactory = LedgerManagerFactory
+ .newLedgerManagerFactory(baseConf, zkc);
+ LedgerManager ledgerManager = newLedgerManagerFactory
+ .newLedgerManager();
+
+ List<Long> ledgerList = new ArrayList<Long>(3);
+ LedgerHandle lh = createAndAddEntriesToLedger();
+ lh.close();
+ ledgerList.add(lh.getId());
+
+ lh = createAndAddEntriesToLedger();
+ lh.close();
+ ledgerList.add(lh.getId());
+
+ lh = createAndAddEntriesToLedger();
+ lh.close();
+ ledgerList.add(lh.getId());
+
+ BookieLedgerIndexer bookieLedgerIndex = new BookieLedgerIndexer(
+ ledgerManager);
+ stopZKCluster();
+ try {
+ bookieLedgerIndex.getBookieToLedgerIndex();
+ fail("Must throw exception as bookies are not running!");
+ } catch (BKAuditException bkAuditException) {
+ // expected behaviour
+ }
+ }
+
+ /**
+ * Verify indexing with multiple ensemble reformation
+ */
+ @Test
+ public void testEnsembleReformation() throws Exception {
+ try {
+ LedgerManagerFactory newLedgerManagerFactory = LedgerManagerFactory
+ .newLedgerManagerFactory(baseConf, zkc);
+ LedgerManager ledgerManager = newLedgerManagerFactory
+ .newLedgerManager();
+
+ List<Long> ledgerList = new ArrayList<Long>(3);
+ LedgerHandle lh1 = createAndAddEntriesToLedger();
+ ledgerList.add(lh1.getId());
+ LedgerHandle lh2 = createAndAddEntriesToLedger();
+ ledgerList.add(lh2.getId());
+
+ startNewBookie();
+ shutdownBookie(bs.size() - 2);
+
+ // add few more entries after ensemble reformation
+ for (int i = 0; i < 10; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(Integer.MAX_VALUE));
+ entry.position(0);
+
+ entries.add(entry.array());
+ lh1.addEntry(entry.array());
+ lh2.addEntry(entry.array());
+ }
+
+ BookieLedgerIndexer bookieLedgerIndex = new BookieLedgerIndexer(
+ ledgerManager);
+
+ Map<String, Set<Long>> bookieToLedgerIndex = bookieLedgerIndex
+ .getBookieToLedgerIndex();
+ assertEquals("Missed few bookies in the bookie-ledger mapping!", 4,
+ bookieToLedgerIndex.size());
+ Collection<Set<Long>> bk2ledgerEntry = bookieToLedgerIndex.values();
+ for (Set<Long> ledgers : bk2ledgerEntry) {
+ assertEquals(
+ "Missed few ledgers in the bookie-ledger mapping!", 2,
+ ledgers.size());
+ for (Long ledgerNode : ledgers) {
+ assertTrue("Unknown ledger-bookie mapping", ledgerList
+ .contains(ledgerNode));
+ }
+ }
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+ }
+
+ private void shutdownBookie(int bkShutdownIndex) throws IOException {
+ bs.remove(bkShutdownIndex).shutdown();
+ File f = tmpDirs.remove(bkShutdownIndex);
+ FileUtils.deleteDirectory(f);
+ }
+
+ private LedgerHandle createAndAddEntriesToLedger() throws BKException,
+ InterruptedException {
+ int numEntriesToWrite = 20;
+ // Create a ledger
+ LedgerHandle lh = bkc.createLedger(digestType, ledgerPassword);
+ LOG.info("Ledger ID: " + lh.getId());
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(Integer.MAX_VALUE));
+ entry.position(0);
+
+ entries.add(entry.array());
+ lh.addEntry(entry.array());
+ }
+ return lh;
+ }
+}
|