bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-1019: Support for reading entries after LAC
Date Tue, 02 May 2017 20:07:44 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master c72ff5efb -> 9c79e078b


BOOKKEEPER-1019: Support for reading entries after LAC

This patch introduces a new client-side configuration option to allow reads outside the boundary
of the local LastAddConfirmed value.

Author: eolivelli <eolivelli@apache.org>

Reviewers: Sijie Guo <sijie@apache.org>, Matteo Merli <mmerli@apache.org>

Closes #121 from eolivelli/BOOKKEEPER-1019-read-after-lac


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9c79e078
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9c79e078
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9c79e078

Branch: refs/heads/master
Commit: 9c79e078b8cfefc24251aefcb727760fb99229ed
Parents: c72ff5e
Author: eolivelli <eolivelli@apache.org>
Authored: Tue May 2 22:07:24 2017 +0200
Committer: Enrico Olivelli <eolivelli@apache.org>
Committed: Tue May 2 22:07:24 2017 +0200

----------------------------------------------------------------------
 .../apache/bookkeeper/client/LedgerHandle.java  |  63 ++++++
 .../bookkeeper/conf/ClientConfiguration.java    |   1 +
 .../bookkeeper/client/BookKeeperTest.java       | 220 ++++++++++++++++++-
 3 files changed, 283 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c79e078/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index b68bb11..d1e5540 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -470,6 +470,7 @@ public class LedgerHandle implements AutoCloseable {
      * @param lastEntry
      *          id of last entry of sequence (included)
      *
+     * @see #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback,
java.lang.Object)
      */
     public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
             throws InterruptedException, BKException {
@@ -481,6 +482,29 @@ public class LedgerHandle implements AutoCloseable {
     }
 
     /**
+     * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed
range.<br>
+     * This is the same of
+     * {@link #asyncReadUnconfirmedEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback,
java.lang.Object) }
+     *
+     * @param firstEntry
+     *          id of first entry of sequence (included)
+     * @param lastEntry
+     *          id of last entry of sequence (included)
+     *
+     * @see #readEntries(long, long)
+     * @see #asyncReadUnconfirmedEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback,
java.lang.Object)
+     * @see #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback,
java.lang.Object)
+     */
+    public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long lastEntry)
+            throws InterruptedException, BKException {
+        CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>();
+
+        asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(), counter);
+
+        return SynchCallbackUtils.waitForResult(counter);
+    }
+
+    /**
      * Read a sequence of entries asynchronously.
      *
      * @param firstEntry
@@ -511,6 +535,45 @@ public class LedgerHandle implements AutoCloseable {
         asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
     }
 
+    /**
+     * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed
range.
+     * <br>This is the same of
+     * {@link #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback,
java.lang.Object) }
+     * but it lets the client read without checking the local value of LastAddConfirmed,
so that it is possibile to
+     * read entries for which the writer has not received the acknowledge yet. <br>
+     * For entries which are within the range 0..LastAddConfirmed BookKeeper guarantees that
the writer has successfully
+     * received the acknowledge.<br>
+     * For entries outside that range it is possible that the writer never received the acknoledge
+     * and so there is the risk that the reader is seeing entries before the writer and this
could result in a consistency
+     * issue in some cases.<br>
+     * With this method you can even read entries before the LastAddConfirmed and entries
after it with one call,
+     * the expected consistency will be as described above for each subrange of ids.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence
+     * @param lastEntry
+     *          id of last entry of sequence
+     * @param cb
+     *          object implementing read callback interface
+     * @param ctx
+     *          control object
+     *
+     * @see #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback,
java.lang.Object)
+     * @see #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback,
java.lang.Object)
+     * @see #readUnconfirmedEntries(long, long)
+     */
+    public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCallback
cb, Object ctx) {
+        // Little sanity check
+        if (firstEntry < 0 || firstEntry > lastEntry) {
+            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx);
+            return;
+        }
+
+        asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
+    }
+
     void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object
ctx) {
         try {
             new PendingReadOp(this, bk.scheduler,

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c79e078/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 2b75e9e..6b913d4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -1072,4 +1072,5 @@ public class ClientConfiguration extends AbstractConfiguration {
     public String getClientRole() {
         return getString(CLIENT_ROLE, CLIENT_ROLE_STANDARD);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c79e078/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 0097028..17d63b3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -1,5 +1,6 @@
 package org.apache.bookkeeper.client;
 
+import java.util.Collections;
 import java.util.Enumeration;
 
 /*
@@ -432,5 +433,222 @@ public class BookKeeperTest extends BaseTestCase {
         rlh.close();
         wlh.close();
         bkcWithExplicitLAC.close();
-    }	
+    }
+
+    @Test(timeout = 60000)
+    public void testReadAfterLastAddConfirmed() throws Exception {
+
+        ClientConfiguration clientConfiguration = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString());
+
+        try (BookKeeper bkWriter = new BookKeeper(clientConfiguration);) {
+            LedgerHandle writeLh = bkWriter.createLedger(digestType, "testPasswd".getBytes());
+            long ledgerId = writeLh.getId();
+            int numOfEntries = 5;
+            for (int i = 0; i < numOfEntries; i++) {
+                writeLh.addEntry(("foobar" + i).getBytes());
+            }
+
+            try (BookKeeper bkReader = new BookKeeper(clientConfiguration);
+                LedgerHandle rlh = bkReader.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());)
{
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                Assert.assertFalse(writeLh.isClosed());
+
+                // with readUnconfirmedEntries we are able to read all of the entries
+                Enumeration<LedgerEntry> entries = rlh.readUnconfirmedEntries(0, numOfEntries
- 1);
+                int entryId = 0;
+                while (entries.hasMoreElements()) {
+                    LedgerEntry entry = entries.nextElement();
+                    String entryString = new String(entry.getEntry());
+                    Assert.assertTrue("Expected entry String: " + ("foobar" + entryId)
+                        + " actual entry String: " + entryString,
+                        entryString.equals("foobar" + entryId));
+                    entryId++;
+                }
+            }
+
+            try (BookKeeper bkReader = new BookKeeper(clientConfiguration);
+                LedgerHandle rlh = bkReader.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());)
{
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                Assert.assertFalse(writeLh.isClosed());
+
+                // without readUnconfirmedEntries we are not able to read all of the entries
+                try {
+                    rlh.readEntries(0, numOfEntries - 1);
+                    fail("shoud not be able to read up to "+ (numOfEntries - 1) + " with
readEntries");
+                } catch (BKException.BKReadException expected) {
+                }
+
+                // read all entries within the 0..LastAddConfirmed range with readEntries
+                assertEquals(rlh.getLastAddConfirmed() + 1,
+                    Collections.list(rlh.readEntries(0, rlh.getLastAddConfirmed())).size());
+
+                // assert local LAC does not change after reads
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                // read all entries within the 0..LastAddConfirmed range with readUnconfirmedEntries
+                assertEquals(rlh.getLastAddConfirmed() + 1,
+                    Collections.list(rlh.readUnconfirmedEntries(0, rlh.getLastAddConfirmed())).size());
+
+                // assert local LAC does not change after reads
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                // read all entries within the LastAddConfirmed..numOfEntries - 1 range with
readUnconfirmedEntries
+                assertEquals(numOfEntries - rlh.getLastAddConfirmed(),
+                    Collections.list(rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(),
numOfEntries - 1)).size());
+
+                // assert local LAC does not change after reads
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                try {
+                    // read all entries within the LastAddConfirmed..numOfEntries range 
with readUnconfirmedEntries
+                    // this is an error, we are going outside the range of existing entries
+                    rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), numOfEntries);
+                    fail("the read tried to access data for unexisting entry id "+numOfEntries);
+                } catch (BKException.BKNoSuchEntryException expected) {
+                    // expecting a BKNoSuchEntryException, as the entry does not exist on
bookies
+                }
+
+                try {
+                    // read all entries within the LastAddConfirmed..numOfEntries range with
readEntries
+                    // this is an error, we are going outside the range of existing entries
+                    rlh.readEntries(rlh.getLastAddConfirmed(), numOfEntries);
+                    fail("the read tries to access data for unexisting entry id "+numOfEntries);
+                } catch (BKException.BKReadException expected) {
+                    // expecting a BKReadException, as the client rejected the request to
access entries
+                    // after local LastAddConfirmed
+                }
+
+            }
+
+            // ensure that after restarting every bookie entries are not lost
+            // even entries after the LastAddConfirmed
+            restartBookies();
+
+            try (BookKeeper bkReader = new BookKeeper(clientConfiguration);
+                LedgerHandle rlh = bkReader.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());)
{
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                Assert.assertFalse(writeLh.isClosed());
+
+                // with readUnconfirmedEntries we are able to read all of the entries
+                Enumeration<LedgerEntry> entries = rlh.readUnconfirmedEntries(0, numOfEntries
- 1);
+                int entryId = 0;
+                while (entries.hasMoreElements()) {
+                    LedgerEntry entry = entries.nextElement();
+                    String entryString = new String(entry.getEntry());
+                    Assert.assertTrue("Expected entry String: " + ("foobar" + entryId)
+                        + " actual entry String: " + entryString,
+                        entryString.equals("foobar" + entryId));
+                    entryId++;
+                }
+            }
+
+            try (BookKeeper bkReader = new BookKeeper(clientConfiguration);
+                LedgerHandle rlh = bkReader.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());)
{
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                Assert.assertFalse(writeLh.isClosed());
+
+                // without readUnconfirmedEntries we are not able to read all of the entries
+                try {
+                    rlh.readEntries(0, numOfEntries - 1);
+                    fail("shoud not be able to read up to "+ (numOfEntries - 1) + " with
readEntries");
+                } catch (BKException.BKReadException expected) {
+                }
+
+                // read all entries within the 0..LastAddConfirmed range with readEntries
+                assertEquals(rlh.getLastAddConfirmed() + 1,
+                    Collections.list(rlh.readEntries(0, rlh.getLastAddConfirmed())).size());
+
+                // assert local LAC does not change after reads
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                // read all entries within the 0..LastAddConfirmed range with readUnconfirmedEntries
+                assertEquals(rlh.getLastAddConfirmed() + 1,
+                    Collections.list(rlh.readUnconfirmedEntries(0, rlh.getLastAddConfirmed())).size());
+
+                // assert local LAC does not change after reads
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                // read all entries within the LastAddConfirmed..numOfEntries - 1 range with
readUnconfirmedEntries
+                assertEquals(numOfEntries - rlh.getLastAddConfirmed(),
+                    Collections.list(rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(),
numOfEntries - 1)).size());
+
+                // assert local LAC does not change after reads
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+                try {
+                    // read all entries within the LastAddConfirmed..numOfEntries range 
with readUnconfirmedEntries
+                    // this is an error, we are going outside the range of existing entries
+                    rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), numOfEntries);
+                    fail("the read tried to access data for unexisting entry id "+numOfEntries);
+                } catch (BKException.BKNoSuchEntryException expected) {
+                    // expecting a BKNoSuchEntryException, as the entry does not exist on
bookies
+                }
+
+                try {
+                    // read all entries within the LastAddConfirmed..numOfEntries range with
readEntries
+                    // this is an error, we are going outside the range of existing entries
+                    rlh.readEntries(rlh.getLastAddConfirmed(), numOfEntries);
+                    fail("the read tries to access data for unexisting entry id "+numOfEntries);
+                } catch (BKException.BKReadException expected) {
+                    // expecting a BKReadException, as the client rejected the request to
access entries
+                    // after local LastAddConfirmed
+                }
+
+            }
+
+            // open ledger with fencing, this will repair the ledger and make the last entry
readable
+            try (BookKeeper bkReader = new BookKeeper(clientConfiguration);
+                LedgerHandle rlh = bkReader.openLedger(ledgerId, digestType, "testPasswd".getBytes());)
{
+                Assert.assertTrue(
+                    "Expected LAC of rlh: " + (numOfEntries - 1) + " actual LAC of rlh: "
+ rlh.getLastAddConfirmed(),
+                    (rlh.getLastAddConfirmed() == (numOfEntries - 1)));
+
+                Assert.assertFalse(writeLh.isClosed());
+
+                // without readUnconfirmedEntries we are not able to read all of the entries
+                Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries
- 1);
+                int entryId = 0;
+                while (entries.hasMoreElements()) {
+                    LedgerEntry entry = entries.nextElement();
+                    String entryString = new String(entry.getEntry());
+                    Assert.assertTrue("Expected entry String: " + ("foobar" + entryId)
+                        + " actual entry String: " + entryString,
+                        entryString.equals("foobar" + entryId));
+                    entryId++;
+                }
+            }
+
+            try {
+                writeLh.close();
+                fail("should not be able to close the first LedgerHandler as a recovery has
been performed");
+            } catch (BKException.BKMetadataVersionException expected) {
+            }
+
+        }
+    }
 }


Mime
View raw message