bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-1093: Piggyback LAC on ReadResponse
Date Wed, 21 Jun 2017 17:58:04 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 90a8f2839 -> 90e4e46bc


BOOKKEEPER-1093: Piggyback LAC on ReadResponse

This change is based #178 - (you can review git sha 40ca8c2)

bookkeeper: LAC piggyback at read response

        - bookie server changes
          * cache maximum lac in file info
          * provide getLastAddConfirmed & setLastAddConfirmed in ledger storage
          * addEntry will set its lac thru setLastAddConfirmed
          * readEntry will read latest lac and piggyback
        - client change
          * check whether the response has lac piggybacked, if there is lac update lac in
its corresponding ledger handle.

Author: Sijie Guo <sijie@apache.org>
Author: Sijie Guo <sijieg@twitter.com>

Reviewers: Jia Zhai <None>, Matteo Merli <mmerli@apache.org>

This closes #180 from sijie/piggyback_lac


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/90e4e46b
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/90e4e46b
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/90e4e46b

Branch: refs/heads/master
Commit: 90e4e46bccdd1a788e10056f4a3818aa2aead452
Parents: 90a8f28
Author: Sijie Guo <sijie@apache.org>
Authored: Wed Jun 21 10:58:00 2017 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Wed Jun 21 10:58:00 2017 -0700

----------------------------------------------------------------------
 .../bookkeeper/client/LedgerRecoveryOp.java     | 12 +--
 .../apache/bookkeeper/client/PendingReadOp.java | 18 ++++-
 .../bookkeeper/conf/ClientConfiguration.java    |  1 +
 .../proto/PerChannelBookieClient.java           | 23 +++++-
 .../bookkeeper/client/TestPiggybackLAC.java     | 80 ++++++++++++++++++++
 .../bookkeeper/test/BookieClientTest.java       |  2 +-
 6 files changed, 123 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90e4e46b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index cc19dc9..97d1885 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -44,7 +44,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
 
     final LedgerHandle lh;
     final AtomicLong readCount, writeCount;
-    final AtomicBoolean readDone;
+    volatile boolean readDone;
     final AtomicBoolean callbackDone;
     volatile long startEntryToRead;
     volatile long endEntryToRead;
@@ -76,7 +76,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
     public LedgerRecoveryOp(LedgerHandle lh, GenericCallback<Void> cb) {
         readCount = new AtomicLong(0);
         writeCount = new AtomicLong(0);
-        readDone = new AtomicBoolean(false);
+        readDone = false;
         callbackDone = new AtomicBoolean(false);
         this.cb = cb;
         this.lh = lh;
@@ -185,7 +185,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
         }
 
         // we only trigger recovery add an entry when readDone == false && callbackDone
== false
-        if (!callbackDone.get() && !readDone.get() && rc == BKException.Code.OK)
{
+        if (!callbackDone.get() && !readDone && rc == BKException.Code.OK)
{
             readCount.incrementAndGet();
             byte[] data = entry.getEntry();
 
@@ -215,7 +215,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
 
         // no entry found. stop recovery procedure but wait until recovery add finished.
         if (rc == BKException.Code.NoSuchEntryException || rc == BKException.Code.NoSuchLedgerExistsException)
{
-            readDone.set(true);
+            readDone = true;
             if (readCount.get() == writeCount.get()) {
                 closeAndCallback();
             }
@@ -231,7 +231,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
             // we are here is because we successfully read an entry but readDone was already
set to true.
             // this would happen on recovery a ledger than has gaps in the tail.
             LOG.warn("Successfully read entry {} for ledger {}, but readDone is already {}",
-                     new Object[] { entry.getEntryId(), lh.getId(), readDone.get() });
+                     new Object[] { entry.getEntryId(), lh.getId(), readDone });
         }
         return;
     }
@@ -248,7 +248,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
             return;
         }
         long numAdd = writeCount.incrementAndGet();
-        if (readDone.get() && readCount.get() == numAdd) {
+        if (readDone && readCount.get() == numAdd) {
             closeAndCallback();
         }
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90e4e46b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 0192296..306c5dd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
@@ -489,14 +490,25 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback
{
         }
     }
 
-    private static class ReadContext {
+    private static class ReadContext implements ReadEntryCallbackCtx {
         final BookieSocketAddress to;
         final LedgerEntryRequest entry;
+        long lac = LedgerHandle.INVALID_ENTRY_ID;
 
         ReadContext(BookieSocketAddress to, LedgerEntryRequest entry) {
             this.to = to;
             this.entry = entry;
         }
+
+        @Override
+        public void setLastAddConfirmed(long lac) {
+            this.lac = lac;
+        }
+
+        @Override
+        public long getLastAddConfirmed() {
+            return lac;
+        }
     }
 
     void sendReadTo(BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException
{
@@ -521,11 +533,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback
{
         heardFromHosts.add(rctx.to);
 
         if (entry.complete(rctx.to, buffer)) {
+            lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
             submitCallback(BKException.Code.OK);
         }
 
         if(numPendingEntries < 0)
-            LOG.error("Read too many values");
+            LOG.error("Read too many values for ledger {} : [{}, {}].", new Object[] { ledgerId,
+                    startEntryId, endEntryId });
     }
 
     protected void submitCallback(int code) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90e4e46b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 311fb82..8b02a80 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -1140,4 +1140,5 @@ public class ClientConfiguration extends AbstractConfiguration {
     public String getClientRole() {
         return getString(CLIENT_ROLE, CLIENT_ROLE_STANDARD);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90e4e46b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 047d76c..6c5d68f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.proto;
 
+import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -68,6 +70,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
@@ -1163,7 +1166,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter
{
      * Called by netty when a message is received on a channel
      */
     @Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
         if (msg instanceof BookieProtocol.Response) {
             BookieProtocol.Response response = (BookieProtocol.Response) msg;
@@ -1208,7 +1211,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter
{
                             if (readResponse.hasData()) {
                               data = readResponse.getData();
                             }
-                            handleReadResponse(ledgerId, entryId, status, data, completionValue);
+                            handleReadResponse(ledgerId, entryId, status, data, INVALID_ENTRY_ID,
completionValue);
                             break;
                         }
                         default:
@@ -1296,7 +1299,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter
{
                             if (readResponse.hasBody()) {
                                 buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
                             }
-                            handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(),
status, buffer, completionValue);
+                            long maxLAC = INVALID_ENTRY_ID;
+                            if (readResponse.hasMaxLAC()) {
+                                maxLAC = readResponse.getMaxLAC();
+                            }
+                            handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(),
status, buffer, maxLAC, completionValue);
                             break;
                         }
                         case WRITE_LAC: {
@@ -1405,7 +1412,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter
{
         glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(),
glac.ctx);
     }
 
-    void handleReadResponse(long ledgerId, long entryId, StatusCode status, ByteBuf buffer,
CompletionValue completionValue) {
+    void handleReadResponse(long ledgerId,
+                            long entryId,
+                            StatusCode status,
+                            ByteBuf buffer,
+                            long maxLAC, // max known lac piggy-back from bookies
+                            CompletionValue completionValue) {
         // The completion value should always be an instance of a ReadCompletion object when
we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
 
@@ -1426,6 +1438,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter
{
         if(buffer != null) {
             buffer = buffer.slice();
         }
+        if (maxLAC > INVALID_ENTRY_ID && (rc.ctx instanceof ReadEntryCallbackCtx))
{
+            ((ReadEntryCallbackCtx) rc.ctx).setLastAddConfirmed(maxLAC);
+        }
         rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer, rc.ctx);
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90e4e46b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
new file mode 100644
index 0000000..9da6f05
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Enumeration;
+
+public class TestPiggybackLAC extends BookKeeperClusterTestCase {
+
+    static Logger LOG = LoggerFactory.getLogger(TestPiggybackLAC.class);
+
+    final DigestType digestType;
+
+    public TestPiggybackLAC() {
+        super(3);
+        this.digestType = DigestType.CRC32;
+    }
+
+    @Test(timeout = 60000)
+    public void testPiggybackLAC() throws Exception {
+        int numEntries = 10;
+        LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, "".getBytes());
+        // tried to add entries
+        for (int i=0; i<numEntries; i++) {
+            lh.addEntry(("data" + i).getBytes());
+            LOG.info("Added entry {}.", i);
+        }
+        LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
+        long lastLAC = readLh.getLastAddConfirmed();
+        assertEquals(numEntries - 2, lastLAC);
+        // write add entries
+        for (int i=0; i<numEntries; i++) {
+            lh.addEntry(("data" + (i + numEntries)).getBytes());
+            LOG.info("Added entry {}.", (i + numEntries));
+        }
+        int numReads = 0;
+        int i = 0;
+        while (true) {
+            if (i > readLh.getLastAddConfirmed()) {
+                break;
+            }
+            Enumeration<LedgerEntry> data = readLh.readEntries(i, i);
+            while (data.hasMoreElements()) {
+                LedgerEntry entry = data.nextElement();
+                assertEquals("data" + i, new String(entry.getEntry()));
+                ++numReads;
+            }
+            i++;
+        }
+        assertEquals(2 * numEntries - 1, numReads);
+        assertEquals(2 * numEntries - 2, readLh.getLastAddConfirmed());
+        readLh.close();
+        lh.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90e4e46b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 3d9a540..fe371e5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -231,7 +231,7 @@ public class BookieClientTest {
     }
 
     private ByteBuf createByteBuffer(int i, long lid, long eid) {
-        ByteBuf bb = Unpooled.buffer(4 + 16);
+        ByteBuf bb = Unpooled.buffer(4 + 24);
         bb.writeLong(lid);
         bb.writeLong(eid);
         bb.writeLong(eid - 1);


Mime
View raw message