zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1377703 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/
Date Mon, 27 Aug 2012 14:55:17 GMT
Author: ivank
Date: Mon Aug 27 14:55:17 2012
New Revision: 1377703

URL: http://svn.apache.org/viewvc?rev=1377703&view=rev
Log:
BOOKKEEPER-304: Prepare bookie vs ledgers cache and will be used by the Auditor (rakeshr via
ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1377703&r1=1377702&r2=1377703&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Aug 27 14:55:17 2012
@@ -102,6 +102,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-248: Rereplicating of under replicated data (umamahesh via ivank)
 
+        BOOKKEEPER-304: Prepare bookie vs ledgers cache and will be used by the Auditor (rakeshr
via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations
in Hedwig (sijie via ivank)

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java?rev=1377703&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
Mon Aug 27 14:55:17 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Preparing bookie vs its corresponding ledgers. This will always look up the
+ * ledgermanager for ledger metadata and will generate indexes.
+ */
+public class BookieLedgerIndexer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BookieLedgerIndexer.class);
+    private final LedgerManager ledgerManager;
+
+    public BookieLedgerIndexer(LedgerManager ledgerManager) {
+        this.ledgerManager = ledgerManager;
+    }
+
+    /**
+     * Generating bookie vs its ledgers map by reading all the ledgers in each
+     * bookie and parsing its metadata.
+     * 
+     * @return bookie2ledgersMap map of bookie vs ledgers
+     * @throws BKAuditException
+     *             exception while getting bookie-ledgers
+     */
+    public Map<String, Set<Long>> getBookieToLedgerIndex()
+            throws BKAuditException {
+        // bookie vs ledgers map
+        final ConcurrentHashMap<String, Set<Long>> bookie2ledgersMap
+            = new ConcurrentHashMap<String, Set<Long>>();
+        final CountDownLatch ledgerCollectorLatch = new CountDownLatch(1);
+
+        Processor<Long> ledgerProcessor = new Processor<Long>() {
+            @Override
+            public void process(final Long ledgerId,
+                    final AsyncCallback.VoidCallback iterCallback) {
+                GenericCallback<LedgerMetadata> genericCallback = new GenericCallback<LedgerMetadata>()
{
+                    @Override
+                    public void operationComplete(final int rc,
+                            LedgerMetadata ledgerMetadata) {
+                        if (rc == BKException.Code.OK) {
+                            StringBuilder bookieAddr;
+                            for (Map.Entry<Long, ArrayList<InetSocketAddress>>
ensemble : ledgerMetadata
+                                    .getEnsembles().entrySet()) {
+                                for (InetSocketAddress bookie : ensemble
+                                        .getValue()) {
+                                    bookieAddr = new StringBuilder();
+                                    StringUtils
+                                            .addrToString(bookieAddr, bookie);
+                                    putLedger(bookie2ledgersMap, bookieAddr
+                                            .toString(), ledgerId);
+                                }
+                            }
+                        } else {
+                            LOG.warn("Unable to read the ledger:" + ledgerId
+                                    + " information");
+                        }
+                        iterCallback.processResult(rc, null, null);
+                    }
+                };
+                ledgerManager.readLedgerMetadata(ledgerId, genericCallback);
+            }
+        };
+        // Reading the result after processing all the ledgers
+        final List<Integer> resultCode = new ArrayList<Integer>(1);
+        ledgerManager.asyncProcessLedgers(ledgerProcessor,
+                new AsyncCallback.VoidCallback() {
+
+                    @Override
+                    public void processResult(int rc, String s, Object obj) {
+                        resultCode.add(rc);
+                        ledgerCollectorLatch.countDown();
+                    }
+                }, null, BKException.Code.OK, BKException.Code.ReadException);
+        try {
+            ledgerCollectorLatch.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new BKAuditException(
+                    "Exception while getting the bookie-ledgers", e);
+        }
+        if (!resultCode.contains(BKException.Code.OK)) {
+            throw new BKAuditException(
+                    "Exception while getting the bookie-ledgers", BKException
+                            .create(resultCode.get(0)));
+        }
+        return bookie2ledgersMap;
+    }
+
+    private void putLedger(ConcurrentHashMap<String, Set<Long>> bookie2ledgersMap,
+            String bookie, long ledgerId) {
+        Set<Long> ledgers = bookie2ledgersMap.get(bookie);
+        // creates an empty list and add to bookie for keeping its ledgers
+        if (ledgers == null) {
+            ledgers = Collections.synchronizedSet(new HashSet<Long>());
+            Set<Long> oldLedgers = bookie2ledgersMap.putIfAbsent(bookie, ledgers);
+            if (oldLedgers != null) {
+                ledgers = oldLedgers;
+            }
+        }
+        ledgers.add(ledgerId);
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java?rev=1377703&r1=1377702&r2=1377703&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
Mon Aug 27 14:55:17 2012
@@ -60,4 +60,15 @@ public abstract class ReplicationExcepti
             super(message);
         }
     }
+
+    /**
+     * Exception while auditing bookie-ledgers
+    */
+    static class BKAuditException extends ReplicationException {
+        private static final long serialVersionUID = 95551905L;
+
+        BKAuditException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java?rev=1377703&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
Mon Aug 27 14:55:17 2012
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
+import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests verifies bookie vs ledger mapping generating by the BookieLedgerIndexer
+ */
+public class BookieLedgerIndexTest extends MultiLedgerManagerTestCase {
+
+    // Depending on the taste, select the amount of logging
+    // by decommenting one of the two lines below
+    // static Logger LOG = Logger.getRootLogger();
+    private static final Logger LOG = LoggerFactory
+            .getLogger(BookieLedgerIndexTest.class);
+
+    private byte[] ledgerPassword = "admin".getBytes();
+    private Random rng; // Random Number Generator
+    private ArrayList<byte[]> entries; // generated entries
+    private DigestType digestType;
+
+    public BookieLedgerIndexTest(String ledgerManagerFactory) {
+        super(3);
+        LOG.info("Running test case using ledger manager : "
+                + ledgerManagerFactory);
+        this.digestType = DigestType.CRC32;
+        // set ledger manager name
+        baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+        baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        rng = new Random(System.currentTimeMillis()); // Initialize the Random
+        // Number Generator
+        entries = new ArrayList<byte[]>(); // initialize the entries list
+    }
+
+    /**
+     * Verify the bookie-ledger mapping with minimum number of bookies and few
+     * ledgers
+     */
+    @Test
+    public void testSimpleBookieLedgerMapping() throws Exception {
+        LedgerManagerFactory newLedgerManagerFactory = LedgerManagerFactory
+                .newLedgerManagerFactory(baseConf, zkc);
+        LedgerManager ledgerManager = newLedgerManagerFactory
+                .newLedgerManager();
+
+        List<Long> ledgerList = new ArrayList<Long>(3);
+        LedgerHandle lh = createAndAddEntriesToLedger();
+        lh.close();
+        ledgerList.add(lh.getId());
+
+        lh = createAndAddEntriesToLedger();
+        lh.close();
+        ledgerList.add(lh.getId());
+
+        lh = createAndAddEntriesToLedger();
+        lh.close();
+        ledgerList.add(lh.getId());
+
+        BookieLedgerIndexer bookieLedgerIndex = new BookieLedgerIndexer(
+                ledgerManager);
+
+        Map<String, Set<Long>> bookieToLedgerIndex = bookieLedgerIndex
+                .getBookieToLedgerIndex();
+
+        assertEquals("Missed few bookies in the bookie-ledger mapping!", 3,
+                bookieToLedgerIndex.size());
+        Collection<Set<Long>> bk2ledgerEntry = bookieToLedgerIndex.values();
+        for (Set<Long> ledgers : bk2ledgerEntry) {
+            assertEquals("Missed few ledgers in the bookie-ledger mapping!", 3,
+                    ledgers.size());
+            for (Long ledgerId : ledgers) {
+                assertTrue("Unknown ledger-bookie mapping", ledgerList
+                        .contains(ledgerId));
+            }
+        }
+    }
+
+    /**
+     * Verify ledger index with failed bookies and throws exception
+     */
+    @Test
+    public void testWithoutZookeeper() throws Exception {
+        LedgerManagerFactory newLedgerManagerFactory = LedgerManagerFactory
+                .newLedgerManagerFactory(baseConf, zkc);
+        LedgerManager ledgerManager = newLedgerManagerFactory
+                .newLedgerManager();
+
+        List<Long> ledgerList = new ArrayList<Long>(3);
+        LedgerHandle lh = createAndAddEntriesToLedger();
+        lh.close();
+        ledgerList.add(lh.getId());
+
+        lh = createAndAddEntriesToLedger();
+        lh.close();
+        ledgerList.add(lh.getId());
+
+        lh = createAndAddEntriesToLedger();
+        lh.close();
+        ledgerList.add(lh.getId());
+
+        BookieLedgerIndexer bookieLedgerIndex = new BookieLedgerIndexer(
+                ledgerManager);
+        stopZKCluster();
+        try {
+            bookieLedgerIndex.getBookieToLedgerIndex();
+            fail("Must throw exception as bookies are not running!");
+        } catch (BKAuditException bkAuditException) {
+            // expected behaviour
+        }
+    }
+
+    /**
+     * Verify indexing with multiple ensemble reformation
+     */
+    @Test
+    public void testEnsembleReformation() throws Exception {
+        try {
+            LedgerManagerFactory newLedgerManagerFactory = LedgerManagerFactory
+                    .newLedgerManagerFactory(baseConf, zkc);
+            LedgerManager ledgerManager = newLedgerManagerFactory
+                    .newLedgerManager();
+
+            List<Long> ledgerList = new ArrayList<Long>(3);
+            LedgerHandle lh1 = createAndAddEntriesToLedger();
+            ledgerList.add(lh1.getId());
+            LedgerHandle lh2 = createAndAddEntriesToLedger();
+            ledgerList.add(lh2.getId());
+
+            startNewBookie();
+            shutdownBookie(bs.size() - 2);
+
+            // add few more entries after ensemble reformation
+            for (int i = 0; i < 10; i++) {
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(Integer.MAX_VALUE));
+                entry.position(0);
+
+                entries.add(entry.array());
+                lh1.addEntry(entry.array());
+                lh2.addEntry(entry.array());
+            }
+
+            BookieLedgerIndexer bookieLedgerIndex = new BookieLedgerIndexer(
+                    ledgerManager);
+
+            Map<String, Set<Long>> bookieToLedgerIndex = bookieLedgerIndex
+                    .getBookieToLedgerIndex();
+            assertEquals("Missed few bookies in the bookie-ledger mapping!", 4,
+                    bookieToLedgerIndex.size());
+            Collection<Set<Long>> bk2ledgerEntry = bookieToLedgerIndex.values();
+            for (Set<Long> ledgers : bk2ledgerEntry) {
+                assertEquals(
+                        "Missed few ledgers in the bookie-ledger mapping!", 2,
+                        ledgers.size());
+                for (Long ledgerNode : ledgers) {
+                    assertTrue("Unknown ledger-bookie mapping", ledgerList
+                            .contains(ledgerNode));
+                }
+            }
+        } catch (BKException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
+        } catch (InterruptedException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        }
+    }
+
+    private void shutdownBookie(int bkShutdownIndex) throws IOException {
+        bs.remove(bkShutdownIndex).shutdown();
+        File f = tmpDirs.remove(bkShutdownIndex);
+        FileUtils.deleteDirectory(f);
+    }
+
+    private LedgerHandle createAndAddEntriesToLedger() throws BKException,
+            InterruptedException {
+        int numEntriesToWrite = 20;
+        // Create a ledger
+        LedgerHandle lh = bkc.createLedger(digestType, ledgerPassword);
+        LOG.info("Ledger ID: " + lh.getId());
+        for (int i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(Integer.MAX_VALUE));
+            entry.position(0);
+
+            entries.add(entry.array());
+            lh.addEntry(entry.array());
+        }
+        return lh;
+    }
+}



Mime
View raw message