zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1374195 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/test/java/org/apache/bookkeeper/client/
Date Fri, 17 Aug 2012 10:41:45 GMT
Author: ivank
Date: Fri Aug 17 10:41:45 2012
New Revision: 1374195

URL: http://svn.apache.org/viewvc?rev=1374195&view=rev
Log:
BOOKKEEPER-247: Detection of under replication (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1374195&r1=1374194&r2=1374195&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Aug 17 10:41:45 2012
@@ -84,6 +84,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-246: Recording of underreplication of ledger entries (ivank)
 
+        BOOKKEEPER-247: Detection of under replication (ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations
in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=1374195&r1=1374194&r2=1374195&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
Fri Aug 17 10:41:45 2012
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,6 +15,7 @@ package org.apache.bookkeeper.client;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.client;
 
 /**
  * This interface determins how entries are distributed among bookies.
@@ -62,4 +61,16 @@ interface DistributionSchedule {
     }
 
     public QuorumCoverageSet getCoverageSet();
+    
+    /**
+     * Whether entry presents on given bookie index
+     * 
+     * @param entryId
+     *            - entryId to check the presence on given bookie index
+     * @param bookieIndex
+     *            - bookie index on which it need to check the possible presence
+     *            of the entry
+     * @return true if it has entry otherwise false.
+     */
+    public boolean hasEntry(long entryId, int bookieIndex);
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java?rev=1374195&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
Fri Aug 17 10:41:45 2012
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.Enumeration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *Checks the complete ledger and finds the UnderReplicated fragments if any
+ */
+public class LedgerChecker {
+    private static Logger LOG = LoggerFactory.getLogger(LedgerChecker.class);
+
+    public final BookieClient bookieClient;
+
+    static class InvalidFragmentException extends Exception {
+        private static final long serialVersionUID = 1467201276417062353L;
+    }
+
+    /**
+     * This will collect all the entry read call backs and finally it will give
+     * call back to previous call back API which is waiting for it once it meets
+     * the expected call backs from down
+     */
+    private static class ReadManyEntriesCallback implements ReadEntryCallback {
+        AtomicBoolean completed = new AtomicBoolean(false);
+        final AtomicLong numEntries;
+        final LedgerFragment fragment;
+        final GenericCallback<LedgerFragment> cb;
+
+        ReadManyEntriesCallback(long numEntries, LedgerFragment fragment,
+                GenericCallback<LedgerFragment> cb) {
+            this.numEntries = new AtomicLong(numEntries);
+            this.fragment = fragment;
+            this.cb = cb;
+        }
+
+        public void readEntryComplete(int rc, long ledgerId, long entryId,
+                ChannelBuffer buffer, Object ctx) {
+            if (rc == BKException.Code.OK) {
+                if (numEntries.decrementAndGet() == 0
+                        && !completed.getAndSet(true)) {
+                    cb.operationComplete(rc, fragment);
+                }
+            } else if (!completed.getAndSet(true)) {
+                cb.operationComplete(rc, fragment);
+            }
+        }
+    }
+
+    public LedgerChecker(BookKeeper bkc) {
+        bookieClient = bkc.getBookieClient();
+    }
+
+    private void verifyLedgerFragment(LedgerFragment fragment,
+            GenericCallback<LedgerFragment> cb) throws InvalidFragmentException {
+        long firstStored = fragment.getFirstStoredEntryId();
+        long lastStored = fragment.getLastStoredEntryId();
+
+        if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
+            if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
+                throw new InvalidFragmentException();
+            }
+            cb.operationComplete(BKException.Code.OK, fragment);
+            return;
+        }
+        if (firstStored == lastStored) {
+            ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
+                    fragment, cb);
+            bookieClient.readEntry(fragment.getAddress(), fragment
+                    .getLedgerId(), firstStored, manycb, null);
+        } else {
+            ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(2,
+                    fragment, cb);
+            bookieClient.readEntry(fragment.getAddress(), fragment
+                    .getLedgerId(), firstStored, manycb, null);
+            bookieClient.readEntry(fragment.getAddress(), fragment
+                    .getLedgerId(), lastStored, manycb, null);
+        }
+    }
+
+    /**
+     * Callback for checking whether an entry exists or not.
+     * It is used to differentiate the cases where it has been written
+     * but now cannot be read, and where it never has been written.
+     */
+    private static class EntryExistsCallback implements ReadEntryCallback {
+        AtomicBoolean entryMayExist = new AtomicBoolean(false);
+        final AtomicInteger numReads;
+        final GenericCallback<Boolean> cb;
+
+        EntryExistsCallback(int numReads,
+                            GenericCallback<Boolean> cb) {
+            this.numReads = new AtomicInteger(numReads);
+            this.cb = cb;
+        }
+
+        public void readEntryComplete(int rc, long ledgerId, long entryId,
+                                      ChannelBuffer buffer, Object ctx) {
+            if (rc != BKException.Code.NoSuchEntryException) {
+                entryMayExist.set(true);
+            }
+
+            if (numReads.decrementAndGet() == 0) {
+                cb.operationComplete(rc, entryMayExist.get());
+            }
+        }
+    }
+
+    /**
+     * This will collect all the fragment read call backs and finally it will
+     * give call back to above call back API which is waiting for it once it
+     * meets the expected call backs from down
+     */
+    private static class FullLedgerCallback implements
+            GenericCallback<LedgerFragment> {
+        final Set<LedgerFragment> badFragments;
+        final AtomicLong numFragments;
+        final GenericCallback<Set<LedgerFragment>> cb;
+
+        FullLedgerCallback(long numFragments,
+                GenericCallback<Set<LedgerFragment>> cb) {
+            badFragments = new HashSet<LedgerFragment>();
+            this.numFragments = new AtomicLong(numFragments);
+            this.cb = cb;
+        }
+
+        public void operationComplete(int rc, LedgerFragment result) {
+            if (rc != BKException.Code.OK) {
+                badFragments.add(result);
+            }
+            if (numFragments.decrementAndGet() == 0) {
+                cb.operationComplete(BKException.Code.OK, badFragments);
+            }
+        }
+    }
+
+    /**
+     * Check that all the fragments in the passed in ledger, and report those
+     * which are missing.
+     */
+    public void checkLedger(LedgerHandle lh,
+                            final GenericCallback<Set<LedgerFragment>> cb) {
+        // build a set of all fragment replicas
+        final Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
+
+        Long curEntryId = null;
+        ArrayList<InetSocketAddress> curEnsemble = null;
+        for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : lh
+                .getLedgerMetadata().getEnsembles().entrySet()) {
+            if (curEntryId != null) {
+                for (int i = 0; i < curEnsemble.size(); i++) {
+                    fragments.add(new LedgerFragment(lh.getId(), curEntryId, e
+                            .getKey() - 1, i, curEnsemble, lh
+                            .getDistributionSchedule()));
+                }
+            }
+            curEntryId = e.getKey();
+            curEnsemble = e.getValue();
+        }
+
+        /* Checking the last segment of the ledger can be complicated in some cases.
+         * In the case that the ledger is closed, we can just check the fragments of
+         * the segment as normal.
+         * In the case that the ledger is open, but enough entries have been written,
+         * for lastAddConfirmed to be set above the start entry of the segment, we
+         * can also check as normal.
+         * However, if lastAddConfirmed cannot be trusted, such as when it's lower than
+         * the first entry id, or not set at all, we cannot be sure if there has been
+         * data written to the segment. For this reason, we have to send a read request
+         * to the bookies which should have the first entry. If they respond with
+         * NoSuchEntry we can assume it was never written. If they respond with anything
+         * else, we must assume the entry has been written, so we run the check.
+         */
+        if (curEntryId != null) {
+            long lastEntry = lh.getLastAddConfirmed();
+
+            if (lastEntry < curEntryId) {
+                lastEntry = curEntryId;
+            }
+
+            final Set<LedgerFragment> finalSegmentFragments = new HashSet<LedgerFragment>();
+            for (int i = 0; i < curEnsemble.size(); i++) {
+                finalSegmentFragments.add(new LedgerFragment(lh.getId(), curEntryId,
+                                                  lastEntry, i, curEnsemble,
+                                                  lh.getDistributionSchedule()));
+            }
+
+            // Check for the case that no last confirmed entry has
+            // been set.
+            if (curEntryId == lastEntry) {
+                final long entryToRead = curEntryId;
+
+                EntryExistsCallback eecb
+                    = new EntryExistsCallback(lh.getLedgerMetadata().getQuorumSize(),
+                                              new GenericCallback<Boolean>() {
+                                                  public void operationComplete(int rc, Boolean
result) {
+                                                      if (result) {
+                                                          fragments.addAll(finalSegmentFragments);
+                                                      }
+                                                      checkFragments(fragments, cb);
+                                                  }
+                                              });
+
+                for (int i = 0; i < lh.getLedgerMetadata().getQuorumSize(); i++) {
+                    int bi = lh.getDistributionSchedule().getBookieIndex(entryToRead, i);
+                    InetSocketAddress addr = curEnsemble.get(bi);
+                    bookieClient.readEntry(addr, lh.getId(),
+                                           entryToRead, eecb, null);
+                }
+                return;
+            } else {
+                fragments.addAll(finalSegmentFragments);
+            }
+        }
+
+        checkFragments(fragments, cb);
+    }
+
+    private void checkFragments(Set<LedgerFragment> fragments,
+                                GenericCallback<Set<LedgerFragment>> cb) {
+        if (fragments.size() == 0) { // no fragments to verify
+            cb.operationComplete(BKException.Code.OK, fragments);
+            return;
+        }
+
+        // verify all the collected fragment replicas
+        FullLedgerCallback allFragmentsCb = new FullLedgerCallback(fragments
+                .size(), cb);
+        for (LedgerFragment r : fragments) {
+            LOG.debug("Checking fragment {}", r);
+            try {
+                verifyLedgerFragment(r, allFragmentsCb);
+            } catch (InvalidFragmentException ife) {
+                LOG.error("Invalid fragment found : {}", r);
+                allFragmentsCb.operationComplete(
+                        BKException.Code.IncorrectParameterException, r);
+            }
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java?rev=1374195&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
Fri Aug 17 10:41:45 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.List;
+import java.net.InetSocketAddress;
+
+/**
+ * Represents the entries of a segment of a ledger which are stored on a single
+ * bookie in the segments bookie ensemble.
+ * 
+ * Used for checking and recovery
+ */
+public class LedgerFragment {
+    final int bookieIndex;
+    final List<InetSocketAddress> ensemble;
+    final long firstEntryId;
+    final long lastEntryId;
+    final long ledgerId;
+    final DistributionSchedule schedule;
+
+    LedgerFragment(long ledgerId, long firstEntryId, long lastEntryId,
+            int bookieIndex, List<InetSocketAddress> ensemble,
+            DistributionSchedule schedule) {
+        this.ledgerId = ledgerId;
+        this.firstEntryId = firstEntryId;
+        this.lastEntryId = lastEntryId;
+        this.bookieIndex = bookieIndex;
+        this.ensemble = ensemble;
+        this.schedule = schedule;
+    }
+
+    long getLedgerId() {
+        return ledgerId;
+    }
+
+    /**
+     * Gets the failedBookie address
+     */
+    public InetSocketAddress getAddress() {
+        return ensemble.get(bookieIndex);
+    }
+
+    /**
+     * Gets the first stored entry id of the fragment in failed bookie.
+     * 
+     * @return entryId
+     */
+    public long getFirstStoredEntryId() {
+        long firstEntry = firstEntryId;
+
+        for (int i = 0; i < ensemble.size() && firstEntry <= lastEntryId; i++)
{
+            if (schedule.hasEntry(firstEntry, bookieIndex)) {
+                return firstEntry;
+            } else {
+                firstEntry++;
+            }
+        }
+        return LedgerHandle.INVALID_ENTRY_ID;
+    }
+
+    /**
+     * Gets the last stored entry id of the fragment in failed bookie.
+     * 
+     * @return entryId
+     */
+    public long getLastStoredEntryId() {
+        long lastEntry = lastEntryId;
+        for (int i = 0; i < ensemble.size() && lastEntry >= firstEntryId; i++)
{
+            if (schedule.hasEntry(lastEntry, bookieIndex)) {
+                return lastEntry;
+            } else {
+                lastEntry--;
+            }
+        }
+        return LedgerHandle.INVALID_ENTRY_ID;
+    }
+
+    /**
+     * Gets the ensemble of fragment
+     * 
+     * @return
+     */
+    public List<InetSocketAddress> getEnsemble() {
+        return this.ensemble;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
+                + "LastEntryID: %d[%d], Host: %s)", ledgerId, firstEntryId,
+                getFirstStoredEntryId(), lastEntryId, getLastStoredEntryId(),
+                getAddress());
+    }
+}
\ No newline at end of file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java?rev=1374195&r1=1374194&r2=1374195&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
Fri Aug 17 10:41:45 2012
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,6 +15,7 @@ package org.apache.bookkeeper.client;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.util.MathUtils;
 
@@ -87,4 +86,9 @@ class RoundRobinDistributionSchedule imp
     public QuorumCoverageSet getCoverageSet() {
         return new RRQuorumCoverageSet();
     }
+    
+    @Override
+    public boolean hasEntry(long entryId, int bookieIndex) {
+        return getReplicaIndex(entryId, bookieIndex) != -1;
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java?rev=1374195&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
Fri Aug 17 10:41:45 2012
@@ -0,0 +1,371 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file 
+ * distributed with this work for additional information 
+ * regarding copyright ownership.  The ASF licenses this file 
+ * to you under the Apache License, Version 2.0 (the 
+ * "License"); you may not use this file except in compliance 
+ * with the License.  You may obtain a copy of the License at 
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, 
+ * software distributed under the License is distributed on an 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations 
+ * under the License. 
+ * 
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests the functionality of LedgerChecker. This Ledger checker should be able
+ * to detect the correct underReplicated fragment
+ */
+public class TestLedgerChecker extends BookKeeperClusterTestCase {
+    private static final byte[] TEST_LEDGER_ENTRY_DATA = "TestCheckerData"
+            .getBytes();
+    private static final byte[] TEST_LEDGER_PASSWORD = "testpasswd".getBytes();
+    static Logger LOG = LoggerFactory.getLogger(TestLedgerChecker.class);
+
+    public TestLedgerChecker() {
+        super(3);
+    }
+
+    class CheckerCallback implements GenericCallback<Set<LedgerFragment>> {
+        private Set<LedgerFragment> result = null;
+        private CountDownLatch latch = new CountDownLatch(1);
+
+        public void operationComplete(int rc, Set<LedgerFragment> result) {
+            this.result = result;
+            latch.countDown();
+        }
+
+        Set<LedgerFragment> waitAndGetResult() throws InterruptedException {
+            latch.await();
+            return result;
+        }
+    }
+
+    /**
+     * Tests that the LedgerChecker should detect the underReplicated fragments
+     * on multiple Bookie crashes
+     */
+    @Test
+    public void testChecker() throws Exception {
+
+        LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+        startNewBookie();
+
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+        }
+        InetSocketAddress replicaToKill = lh.getLedgerMetadata().getEnsembles()
+                .get(0L).get(0);
+        LOG.info("Killing {}", replicaToKill);
+        killBookie(replicaToKill);
+
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+        }
+
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
+        assertNotNull("Result shouldn't be null", result);
+        for (LedgerFragment r : result) {
+            LOG.info("unreplicated fragment: {}", r);
+        }
+        assertEquals("Should have one missing fragment", 1, result.size());
+        assertEquals("Fragment should be missing from first replica", result
+                .iterator().next().getAddress(), replicaToKill);
+
+        InetSocketAddress replicaToKill2 = lh.getLedgerMetadata()
+                .getEnsembles().get(0L).get(1);
+        LOG.info("Killing {}", replicaToKill2);
+        killBookie(replicaToKill2);
+
+        result = getUnderReplicatedFragments(lh);
+        assertNotNull("Result shouldn't be null", result);
+        for (LedgerFragment r : result) {
+            LOG.info("unreplicated fragment: {}", r);
+        }
+        assertEquals("Should have three missing fragments", 3, result.size());
+    }
+
+    /**
+     * Tests that ledger checker should pick the fragment as bad only if any of
+     * the fragment entries not meeting the quorum.
+     */
+    // /////////////////////////////////////////////////////
+    // /////////Ensemble = 3, Quorum = 2 ///////////////////
+    // /Sample Ledger meta data should look like////////////
+    // /0 a b c /////*entry present in a,b. Now kill c//////
+    // /1 a b d ////////////////////////////////////////////
+    // /Here even though one BK failed at this stage, //////
+    // /we don't have any missed entries. Quorum satisfied//
+    // /So, there should not be any missing replicas.///////
+    // /////////////////////////////////////////////////////
+    @Test(timeout = 3000)
+    public void testShouldNotGetTheFragmentIfThereIsNoMissedEntry()
+            throws Exception {
+
+        LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+        lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+
+        // Entry should have added in first 2 Bookies.
+
+        // Kill the 3rd BK from ensemble.
+        ArrayList<InetSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+                .getEnsembles().get(0L);
+        InetSocketAddress lastBookieFromEnsemble = firstEnsemble.get(2);
+        LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
+                + firstEnsemble);
+        killBookie(lastBookieFromEnsemble);
+
+        startNewBookie();
+
+        LOG.info("Ensembles after first entry :"
+                + lh.getLedgerMetadata().getEnsembles());
+
+        // Adding one more entry. Here enseble should be reformed.
+        lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+
+        LOG.info("Ensembles after second entry :"
+                + lh.getLedgerMetadata().getEnsembles());
+
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
+
+        assertNotNull("Result shouldn't be null", result);
+
+        for (LedgerFragment r : result) {
+            LOG.info("unreplicated fragment: {}", r);
+        }
+
+        assertEquals("Should not have any missing fragment", 0, result.size());
+    }
+
+    /**
+     * Tests that LedgerChecker should give two fragments when 2 bookies failed
+     * in same ensemble when ensemble = 3, quorum = 2
+     */
+    @Test(timeout = 3000)
+    public void testShouldGetTwoFrgamentsIfTwoBookiesFailedInSameEnsemble()
+            throws Exception {
+
+        LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+        startNewBookie();
+        startNewBookie();
+        lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+
+        ArrayList<InetSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+                .getEnsembles().get(0L);
+
+        InetSocketAddress firstBookieFromEnsemble = firstEnsemble.get(0);
+        killBookie(firstEnsemble, firstBookieFromEnsemble);
+
+        InetSocketAddress secondBookieFromEnsemble = firstEnsemble.get(1);
+        killBookie(firstEnsemble, secondBookieFromEnsemble);
+        lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
+
+        assertNotNull("Result shouldn't be null", result);
+
+        for (LedgerFragment r : result) {
+            LOG.info("unreplicated fragment: {}", r);
+        }
+
+        assertEquals("There should be 2 fragments", 2, result.size());
+    }
+
+    /**
+     * Tests that LedgerChecker should not get any underReplicated fragments, if
+     * corresponding ledger does not exists.
+     */
+    @Test(timeout = 3000)
+    public void testShouldNotGetAnyFragmentIfNoLedgerPresent()
+            throws Exception {
+
+        LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+
+        ArrayList<InetSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+                .getEnsembles().get(0L);
+        InetSocketAddress firstBookieFromEnsemble = firstEnsemble.get(0);
+        killBookie(firstBookieFromEnsemble);
+        startNewBookie();
+        lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+        bkc.deleteLedger(lh.getId());
+
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
+        assertNotNull("Result shouldn't be null", result);
+
+        assertEquals("There should be 0 fragments. But returned fragments are "
+                + result, 0, result.size());
+    }
+
+    /**
+     * Tests that LedgerChecker should get failed ensemble number of fragments
+     * if ensemble bookie failures on next entry
+     */
+    @Test(timeout = 3000)
+    public void testShouldGetFailedEnsembleNumberOfFgmntsIfEnsembleBookiesFailedOnNextWrite()
+            throws Exception {
+
+        startNewBookie();
+        startNewBookie();
+        LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+        for (int i = 0; i < 3; i++) {
+            lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+        }
+
+        // Kill all three bookies
+        ArrayList<InetSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+                .getEnsembles().get(0L);
+        for (InetSocketAddress bkAddr : firstEnsemble) {
+            killBookie(firstEnsemble, bkAddr);
+        }
+
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
+
+        assertNotNull("Result shouldn't be null", result);
+
+        for (LedgerFragment r : result) {
+            LOG.info("unreplicated fragment: {}", r);
+        }
+
+        assertEquals("There should be 3 fragments", 3, result.size());
+    }
+
+    /**
+     * Tests that LedgerChecker should not get any fragments as underReplicated
+     * if Ledger itself is empty
+     */
+    @Test(timeout = 3000)
+    public void testShouldNotGetAnyFragmentWithEmptyLedger() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
+        assertNotNull("Result shouldn't be null", result);
+        assertEquals("There should be 0 fragments. But returned fragments are "
+                + result, 0, result.size());
+    }
+
+    /**
+     * Tests that LedgerChecker should get all fragments if ledger is empty
+     * but all bookies in the ensemble are down.
+     * In this case, there's no way to tell whether data was written or not.
+     * In this case, there'll only be two fragments, as quorum is 2 and we only
+     * suspect that the first entry of the ledger could exist.
+     */
+    @Test(timeout = 3000)
+    public void testShouldGet2FragmentsWithEmptyLedgerButBookiesDead() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+        for (InetSocketAddress b : lh.getLedgerMetadata().getEnsembles().get(0L)) {
+            killBookie(b);
+        }
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
+        assertNotNull("Result shouldn't be null", result);
+        assertEquals("There should be 2 fragments.", 2, result.size());
+    }
+
+    /**
+     * Tests that LedgerChecker should one fragment as underReplicated
+     * if there is an open ledger with single entry written.
+     */
+    @Test(timeout = 3000)
+    public void testShouldGetOneFragmentWithSingleEntryOpenedLedger() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+        lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+        ArrayList<InetSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+                .getEnsembles().get(0L);
+        InetSocketAddress lastBookieFromEnsemble = firstEnsemble.get(0);
+        LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
+                + firstEnsemble);
+        killBookie(lastBookieFromEnsemble);
+
+        startNewBookie();
+
+        //Open ledger separately for Ledger checker.
+        LedgerHandle lh1 =bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh1);
+        assertNotNull("Result shouldn't be null", result);
+        assertEquals("There should be 1 fragment. But returned fragments are "
+                + result, 1, result.size());
+    }
+
+    /**
+     * Tests that LedgerChecker correctly identifies missing fragments
+     * when a single entry is written after an ensemble change.
+     * This is important, as the last add confirmed may be less than the
+     * first entry id of the final segment.
+     */
+    @Test(timeout = 3000)
+    public void testSingleEntryAfterEnsembleChange() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+        }
+        ArrayList<InetSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+                .getEnsembles().get(0L);
+        InetSocketAddress lastBookieFromEnsemble = firstEnsemble.get(
+                lh.getDistributionSchedule().getBookieIndex(lh.getLastAddPushed(), 0));
+        LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
+                + firstEnsemble);
+        killBookie(lastBookieFromEnsemble);
+        startNewBookie();
+
+        lh.addEntry(TEST_LEDGER_ENTRY_DATA);
+
+        lastBookieFromEnsemble = firstEnsemble.get(
+                lh.getDistributionSchedule().getBookieIndex(lh.getLastAddPushed(), 1));
+        LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
+                + firstEnsemble);
+        killBookie(lastBookieFromEnsemble);
+
+        //Open ledger separately for Ledger checker.
+        LedgerHandle lh1 =bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32,
+                TEST_LEDGER_PASSWORD);
+
+        Set<LedgerFragment> result = getUnderReplicatedFragments(lh1);
+        assertNotNull("Result shouldn't be null", result);
+        assertEquals("There should be 3 fragment. But returned fragments are "
+                + result, 3, result.size());
+    }
+
+    private Set<LedgerFragment> getUnderReplicatedFragments(LedgerHandle lh)
+            throws InterruptedException {
+        LedgerChecker checker = new LedgerChecker(bkc);
+        CheckerCallback cb = new CheckerCallback();
+        checker.checkLedger(lh, cb);
+        Set<LedgerFragment> result = cb.waitAndGetResult();
+        return result;
+    }
+
+    private void killBookie(ArrayList<InetSocketAddress> firstEnsemble,
+            InetSocketAddress ensemble) throws InterruptedException {
+        LOG.info("Killing " + ensemble + " from ensemble=" + firstEnsemble);
+        killBookie(ensemble);
+    }
+
+}



Mime
View raw message