bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-868: Add ADD_ENTRY quorum timeout (Leigh Stewart via sijie)
Date Tue, 06 Oct 2015 08:48:57 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 6cfecea6c -> 48ab23ef3


BOOKKEEPER-868: Add ADD_ENTRY quorum timeout (Leigh Stewart via sijie)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/48ab23ef
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/48ab23ef
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/48ab23ef

Branch: refs/heads/master
Commit: 48ab23ef3e16f8edfda2a977f9c0fc99c441db40
Parents: 6cfecea
Author: Sijie Guo <sijie@apache.org>
Authored: Tue Oct 6 01:48:46 2015 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Tue Oct 6 01:48:46 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/bookkeeper/client/BKException.java   |  11 ++
 .../apache/bookkeeper/client/PendingAddOp.java  |  51 ++++++-
 .../bookkeeper/conf/ClientConfiguration.java    |  24 ++++
 .../apache/bookkeeper/proto/BookieClient.java   |   6 +
 .../client/TestAddEntryQuorumTimeout.java       | 144 +++++++++++++++++++
 .../test/BookKeeperClusterTestCase.java         |   2 +-
 7 files changed, 233 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 14a9bed..d626197 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -104,6 +104,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie)
 
+        BOOKKEEPER-868: Add ADD_ENTRY quorum timeout (Leigh Stewart via sijie)
+
       bookkeeper-server:
 
         BOOKKEEPER-695: Some entry logs are not removed from the bookie storage (Matteo Merli
via sijie)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index b43506d..3991085 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -94,6 +94,8 @@ public abstract class BKException extends Exception {
             return new BKClientClosedException();
         case Code.IllegalOpException:
             return new BKIllegalOpException();
+        case Code.AddEntryQuorumTimeoutException:
+            return new BKAddEntryQuorumTimeoutException();
         default:
             return new BKUnexpectedConditionException();
         }
@@ -125,6 +127,7 @@ public abstract class BKException extends Exception {
         int MetaStoreException = -18;
         int ClientClosedException = -19;
         int LedgerExistException = -20;
+        int AddEntryQuorumTimeoutException = -21;
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
@@ -203,6 +206,8 @@ public abstract class BKException extends Exception {
             return "BookKeeper client is closed";
         case Code.IllegalOpException:
             return "Invalid operation";
+        case Code.AddEntryQuorumTimeoutException:
+            return "Add entry quorum wait timed out";
         default:
             return "Unexpected condition";
         }
@@ -250,6 +255,12 @@ public abstract class BKException extends Exception {
         }
     }
 
+    public static class BKAddEntryQuorumTimeoutException extends BKException {
+        public BKAddEntryQuorumTimeoutException() {
+            super(Code.AddEntryQuorumTimeoutException);
+        }
+    }
+
     public static class BKUnexpectedConditionException extends BKException {
         public BKUnexpectedConditionException() {
             super(Code.UnexpectedConditionException);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index fb87677..4034c35 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -27,9 +27,14 @@ import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This represents a pending add operation. When it has got success from all
@@ -40,7 +45,7 @@ import org.slf4j.LoggerFactory;
  *
  *
  */
-class PendingAddOp implements WriteCallback {
+class PendingAddOp implements WriteCallback, TimerTask {
     private final static Logger LOG = LoggerFactory.getLogger(PendingAddOp.class);
 
     ChannelBuffer toSend;
@@ -56,6 +61,10 @@ class PendingAddOp implements WriteCallback {
     LedgerHandle lh;
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
+
+    final int timeoutSec;
+    Timeout timeout = null;
+
     OpStatsLogger addOpLogger;
 
     PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) {
@@ -63,10 +72,9 @@ class PendingAddOp implements WriteCallback {
         this.cb = cb;
         this.ctx = ctx;
         this.entryId = LedgerHandle.INVALID_ENTRY_ID;
-
-        ackSet = lh.distributionSchedule.getAckSet();
-
-        addOpLogger = lh.bk.getAddOpLogger();
+        this.ackSet = lh.distributionSchedule.getAckSet();
+        this.addOpLogger = lh.bk.getAddOpLogger();
+        this.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
     }
 
     /**
@@ -90,6 +98,31 @@ class PendingAddOp implements WriteCallback {
                 this, bookieIndex, flags);
     }
 
+    @Override
+    public void run(Timeout timeout) {
+        timeoutQuorumWait();
+    }
+
+    void timeoutQuorumWait() {
+        try {
+            lh.bk.mainWorkerPool.submitOrdered(lh.ledgerId, new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    if (completed) {
+                        return;
+                    }
+                    lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
+                }
+                @Override
+                public String toString() {
+                    return String.format("AddEntryQuorumTimeout(lid=%d, eid=%d)", lh.ledgerId,
entryId);
+                }
+            });
+        } catch (RejectedExecutionException e) {
+            LOG.warn("Timeout add entry quorum wait failed {} entry: {}", lh.ledgerId, entryId);
+        }
+    }
+
     void unsetSuccessAndSendWriteRequest(int bookieIndex) {
         if (toSend == null) {
             // this addOp hasn't yet had its mac computed. When the mac is
@@ -131,7 +164,10 @@ class PendingAddOp implements WriteCallback {
     }
 
     void initiate(ChannelBuffer toSend, int entryLength) {
-        requestTimeNanos = MathUtils.nowInNano();
+        if (timeoutSec > -1) {
+            this.timeout = lh.bk.bookieClient.scheduleTimeout(this, timeoutSec, TimeUnit.SECONDS);
+        }
+        this.requestTimeNanos = MathUtils.nowInNano();
         this.toSend = toSend;
         this.entryLength = entryLength;
         for (int bookieIndex : writeSet) {
@@ -190,6 +226,9 @@ class PendingAddOp implements WriteCallback {
     }
 
     void submitCallback(final int rc) {
+        if (null != timeout) {
+            timeout.cancel();
+        }
         long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
         if (rc != BKException.Code.OK) {
             addOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index dde6d3a..fdbfd53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -59,6 +59,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
     // Timeout Setting
     protected final static String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
+    protected final static String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec";
     protected final static String READ_ENTRY_TIMEOUT_SEC = "readEntryTimeoutSec";
     protected final static String TIMEOUT_TASK_INTERVAL_MILLIS = "timeoutTaskIntervalMillis";
     protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs";
@@ -434,6 +435,29 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get the timeout for top-level add request. That is, the amount of time we should spend
+     * waiting for ack quorum.
+     *
+     * @return add entry ack quorum timeout.
+     */
+    public int getAddEntryQuorumTimeout() {
+        return getInt(ADD_ENTRY_QUORUM_TIMEOUT_SEC, -1);
+    }
+
+    /**
+     * Set timeout for top-level add entry request.
+     * @see #getAddEntryQuorumTimeout()
+     *
+     * @param timeout
+     *          The new add entry ack quorum timeout in seconds.
+     * @return client configuration.
+     */
+    public ClientConfiguration setAddEntryQuorumTimeout(int timeout) {
+        setProperty(ADD_ENTRY_QUORUM_TIMEOUT_SEC, timeout);
+        return this;
+    }
+
+    /**
      * Get the timeout for read entry. This is the number of seconds we wait without hearing
      * a response for read entry request from a bookie before we consider it failed. By default,
      * we use socket timeout specified at {@link #getReadTimeout()}.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 87d1865..909cdd0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -47,6 +47,8 @@ import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -248,6 +250,10 @@ public class BookieClient implements PerChannelBookieClientFactory {
         return closed;
     }
 
+    public Timeout scheduleTimeout(TimerTask task, long timeoutSec, TimeUnit timeUnit) {
+        return requestTimer.newTimeout(task, timeoutSec, timeUnit);
+    }
+
     public void close() {
         closeLock.writeLock().lock();
         try {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
new file mode 100644
index 0000000..da62fe0
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class TestAddEntryQuorumTimeout extends BookKeeperClusterTestCase implements AddCallback
{
+
+    final static Logger logger = LoggerFactory.getLogger(TestAddEntryQuorumTimeout.class);
+
+    final DigestType digestType;
+    final byte[] testPasswd = "".getBytes();
+
+    public TestAddEntryQuorumTimeout() {
+        super(3);
+        baseClientConf.setAddEntryTimeout(10);
+        baseClientConf.setAddEntryQuorumTimeout(1);
+        this.digestType = DigestType.CRC32;
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+    }
+
+    private static class SyncObj {
+        volatile int counter = 0;
+        volatile int rc = -1;
+        public SyncObj() {
+            counter = 0;
+        }
+    }
+
+    @Override
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+        SyncObj x = (SyncObj) ctx;
+        synchronized (x) {
+            x.rc = rc;
+            x.counter++;
+            x.notify();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testBasicTimeout() throws Exception {
+        BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
+        LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
+        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        byte[] data = "foobar".getBytes();
+        lh.addEntry(data);
+        sleepBookie(curEns.get(0), 5).await();
+        try {
+            lh.addEntry(data);
+            Assert.fail("should have thrown");
+        } catch (BKException.BKAddEntryQuorumTimeoutException ex) {
+        }
+    }
+
+    private void waitForSyncObj(SyncObj syncObj) throws Exception {
+        synchronized (syncObj) {
+            while (syncObj.counter < 1) {
+                logger.debug("Entries counter = " + syncObj.counter);
+                syncObj.wait();
+            }
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testTimeoutWithPendingOps() throws Exception {
+        BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
+        LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
+        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        byte[] data = "foobar".getBytes();
+
+        SyncObj syncObj1 = new SyncObj();
+        SyncObj syncObj2 = new SyncObj();
+        SyncObj syncObj3 = new SyncObj();
+
+        lh.addEntry(data);
+        sleepBookie(curEns.get(0), 5).await();
+        lh.asyncAddEntry(data, this, syncObj1);
+        lh.asyncAddEntry(data, this, syncObj2);
+        lh.asyncAddEntry(data, this, syncObj3);
+
+        waitForSyncObj(syncObj1);
+        Assert.assertEquals(BKException.Code.AddEntryQuorumTimeoutException, syncObj1.rc);
+        waitForSyncObj(syncObj2);
+        Assert.assertEquals(BKException.Code.AddEntryQuorumTimeoutException, syncObj2.rc);
+        waitForSyncObj(syncObj3);
+        Assert.assertEquals(BKException.Code.AddEntryQuorumTimeoutException, syncObj3.rc);
+    }
+
+    @Test(timeout = 60000)
+    public void testLedgerClosedAfterTimeout() throws Exception {
+        BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
+        LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
+        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        byte[] data = "foobar".getBytes();
+        CountDownLatch b0latch = sleepBookie(curEns.get(0), 5);
+        try {
+            lh.addEntry(data);
+            Assert.fail("should have thrown");
+        } catch (BKException.BKAddEntryQuorumTimeoutException ex) {
+        }
+        b0latch.await();
+        try {
+            lh.addEntry(data);
+            Assert.fail("should have thrown");
+        } catch (BKException.BKLedgerClosedException ex) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index ced2c9f..fce689d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -275,7 +275,7 @@ public abstract class BookKeeperClusterTestCase {
      *          Socket Address
      * @param seconds
      *          Sleep seconds
-     * @return Count Down latch which will be counted down when sleep finishes
+     * @return Count Down latch which will be counted down just after sleep begins
      * @throws InterruptedException
      * @throws IOException
      */


Mime
View raw message