Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1841FE2F2 for ; Mon, 3 Dec 2012 09:46:47 +0000 (UTC) Received: (qmail 25780 invoked by uid 500); 3 Dec 2012 09:46:46 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 25493 invoked by uid 500); 3 Dec 2012 09:46:37 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 25428 invoked by uid 99); 3 Dec 2012 09:46:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Dec 2012 09:46:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Dec 2012 09:46:29 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5D9B12388980; Mon, 3 Dec 2012 09:46:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1416393 - in /zookeeper/bookkeeper/trunk/bookkeeper-server/src: main/java/org/apache/bookkeeper/client/ main/java/org/apache/bookkeeper/conf/ test/java/org/apache/bookkeeper/client/ test/java/org/apache/bookkeeper/test/ Date: Mon, 03 Dec 2012 09:46:06 -0000 To: commits@zookeeper.apache.org From: ivank@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121203094607.5D9B12388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ivank Date: Mon Dec 3 09:46:05 2012 New Revision: 1416393 URL: http://svn.apache.org/viewvc?rev=1416393&view=rev Log: BOOKKEEPER-336 bookie readEntries is taking more time if the ensemble has failed bookie(s) Basic speculative functionality in place Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1416393&r1=1416392&r2=1416393&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Mon Dec 3 09:46:05 2012 @@ -21,11 +21,19 @@ package org.apache.bookkeeper.client; * */ import java.net.InetSocketAddress; -import java.util.ArrayDeque; +import java.util.concurrent.ArrayBlockingQueue; import java.util.ArrayList; import java.util.Enumeration; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.List; + +import java.util.Timer; +import java.util.TimerTask; + import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; @@ -45,7 +53,10 @@ import org.jboss.netty.buffer.ChannelBuf class PendingReadOp implements Enumeration, ReadEntryCallback { Logger LOG = LoggerFactory.getLogger(PendingReadOp.class); + final int speculativeReadTimeout; + Timer speculativeReadTimer; Queue seq; + Set heardFromHosts; ReadCallback cb; Object ctx; LedgerHandle lh; @@ -53,59 +64,135 @@ class PendingReadOp implements Enumerati long startEntryId; long endEntryId; - private class LedgerEntryRequest extends LedgerEntry { + class LedgerEntryRequest extends LedgerEntry { int nextReplicaIndexToReadFrom = 0; AtomicBoolean complete = new AtomicBoolean(false); int firstError = BKException.Code.OK; final ArrayList ensemble; + final List writeSet; + final BitSet sentReplicas; + final BitSet erroredReplicas; LedgerEntryRequest(ArrayList ensemble, long lId, long eId) { super(lId, eId); this.ensemble = ensemble; + this.writeSet = lh.distributionSchedule.getWriteSet(entryId); + this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + } + + private int getReplicaIndex(InetSocketAddress host) { + int bookieIndex = ensemble.indexOf(host); + if (bookieIndex == -1) { + return -1; + } + return writeSet.indexOf(bookieIndex); + } + + private BitSet getSentToBitSet() { + BitSet b = new BitSet(ensemble.size()); + + for (int i = 0; i < sentReplicas.length(); i++) { + if (sentReplicas.get(i)) { + b.set(writeSet.get(i)); + } + } + return b; + } + + private BitSet getHeardFromBitSet(Set heardFromHosts) { + BitSet b = new BitSet(ensemble.size()); + for (InetSocketAddress i : heardFromHosts) { + int index = ensemble.indexOf(i); + if (index != -1) { + b.set(index); + } + } + return b; + } + + private boolean readsOutstanding() { + return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0; + } + + /** + * Send to next replica speculatively, if required and possible. + * This returns the host we may have sent to for unit testing. + * @return host we sent to if we sent. null otherwise. + */ + synchronized InetSocketAddress maybeSendSpeculativeRead(Set heardFromHosts) { + if (nextReplicaIndexToReadFrom >= lh.getLedgerMetadata().getWriteQuorumSize()) { + return null; + } + + BitSet sentTo = getSentToBitSet(); + BitSet heardFrom = getHeardFromBitSet(heardFromHosts); + sentTo.and(heardFrom); + + // only send another read, if we have had no response at all (even for other entries) + // from any of the other bookies we have sent the request to + if (sentTo.cardinality() == 0) { + return sendNextRead(); + } else { + return null; + } } - void sendNextRead() { + synchronized InetSocketAddress sendNextRead() { if (nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) { // we are done, the read has failed from all replicas, just fail the // read submitCallback(firstError); - return; + return null; } + int replica = nextReplicaIndexToReadFrom; int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom); nextReplicaIndexToReadFrom++; try { - sendReadTo(ensemble.get(bookieIndex), this); + InetSocketAddress to = ensemble.get(bookieIndex); + sendReadTo(to, this); + sentReplicas.set(replica); + return to; } catch (InterruptedException ie) { LOG.error("Interrupted reading entry " + this, ie); Thread.currentThread().interrupt(); submitCallback(BKException.Code.ReadException); + return null; } } - void logErrorAndReattemptRead(String errMsg, int rc) { + synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) { if (firstError == BKException.Code.OK) { firstError = rc; } - int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom - 1); LOG.error(errMsg + " while reading entry: " + entryId + " ledgerId: " + lh.ledgerId + " from bookie: " - + ensemble.get(bookieIndex)); + + host); - sendNextRead(); + int replica = getReplicaIndex(host); + if (replica == -1) { + LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble); + return; + } + erroredReplicas.set(replica); + + if (!readsOutstanding()) { + sendNextRead(); + } } // return true if we managed to complete the entry - boolean complete(final ChannelBuffer buffer) { + boolean complete(InetSocketAddress host, final ChannelBuffer buffer) { ChannelBufferInputStream is; try { is = lh.macManager.verifyDigestAndReturnData(entryId, buffer); } catch (BKDigestMatchException e) { - logErrorAndReattemptRead("Mac mismatch", BKException.Code.DigestMatchException); + logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException); return false; } @@ -133,23 +220,40 @@ class PendingReadOp implements Enumerati } PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, ReadCallback cb, Object ctx) { - - seq = new ArrayDeque((int) (endEntryId - startEntryId)); + seq = new ArrayBlockingQueue((int) ((endEntryId + 1) - startEntryId)); this.cb = cb; this.ctx = ctx; this.lh = lh; this.startEntryId = startEntryId; this.endEntryId = endEntryId; numPendingEntries = endEntryId - startEntryId + 1; + speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout(); + if (speculativeReadTimeout > 0) { + speculativeReadTimer = new Timer("SpeculativeRead-L"+lh.getId()+"-S"+startEntryId+"-E"+endEntryId); + } else { + speculativeReadTimer = null; + } + heardFromHosts = new HashSet(); } public void initiate() throws InterruptedException { long nextEnsembleChange = startEntryId, i = startEntryId; ArrayList ensemble = null; - do { - LOG.debug("Acquiring lock: {}", i); + if (speculativeReadTimer != null) { + speculativeReadTimer.schedule(new TimerTask() { + public void run() { + for (LedgerEntryRequest r : seq) { + if (!r.isComplete()) { + r.maybeSendSpeculativeRead(heardFromHosts); + } + } + } + }, speculativeReadTimeout, speculativeReadTimeout); + } + + do { if (i == nextEnsembleChange) { ensemble = lh.metadata.getEnsemble(i); nextEnsembleChange = lh.metadata.getNextEnsembleChange(i); @@ -162,16 +266,27 @@ class PendingReadOp implements Enumerati } while (i <= endEntryId); } + private static class ReadContext { + final InetSocketAddress to; + final LedgerEntryRequest entry; + + ReadContext(InetSocketAddress to, LedgerEntryRequest entry) { + this.to = to; + this.entry = entry; + } + } + void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException { lh.opCounterSem.acquire(); lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId, - this, entry); + this, new ReadContext(to, entry)); } @Override public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) { - final LedgerEntryRequest entry = (LedgerEntryRequest) ctx; + final ReadContext rctx = (ReadContext)ctx; + final LedgerEntryRequest entry = rctx.entry; lh.opCounterSem.release(); @@ -190,11 +305,13 @@ class PendingReadOp implements Enumerati } if (rc != BKException.Code.OK) { - entry.logErrorAndReattemptRead("Error: " + BKException.getMessage(rc), rc); + entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc); return; } - if (entry.complete(buffer)) { + heardFromHosts.add(rctx.to); + + if (entry.complete(rctx.to, buffer)) { numPendingEntries--; } @@ -207,6 +324,9 @@ class PendingReadOp implements Enumerati } private void submitCallback(int code) { + if (speculativeReadTimer != null) { + speculativeReadTimer.cancel(); + } cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx); } public boolean hasMoreElements() { Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1416393&r1=1416392&r2=1416393&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Mon Dec 3 09:46:05 2012 @@ -43,6 +43,7 @@ public class ClientConfiguration extends // NIO Parameters protected final static String CLIENT_TCP_NODELAY = "clientTcpNoDelay"; protected final static String READ_TIMEOUT = "readTimeout"; + protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout"; // Number Woker Threads protected final static String NUM_WORKER_THREADS = "numWorkerThreads"; @@ -275,4 +276,39 @@ public class ClientConfiguration extends setProperty(NUM_WORKER_THREADS, numThreads); return this; } + + /** + * Get the period of time after which a speculative entry read should be triggered. + * A speculative entry read is sent to the next replica bookie before + * an error or response has been received for the previous entry read request. + * + * A speculative entry read is only sent if we have not heard from the current + * replica bookie during the entire read operation which may comprise of many entries. + * + * Speculative reads allow the client to avoid having to wait for the connect timeout + * in the case that a bookie has failed. It induces higher load on the network and on + * bookies. This should be taken into account before changing this configuration value. + * + * @see org.apache.bookkeeper.client.LedgerHandle#asyncReadEntries + * @return the speculative read timeout in milliseconds. Default 2000. + */ + public int getSpeculativeReadTimeout() { + return getInt(SPECULATIVE_READ_TIMEOUT, 2000); + } + + /** + * Set the speculative read timeout. A lower timeout will reduce read latency in the + * case of a failed bookie, while increasing the load on bookies and the network. + * + * The default is 2000 milliseconds. A value of 0 will disable speculative reads + * completely. + * + * @see #getSpeculativeReadTimeout() + * @param timeout the timeout value, in milliseconds + * @return client configuration + */ + public ClientConfiguration setSpeculativeReadTimeout(int timeout) { + setProperty(SPECULATIVE_READ_TIMEOUT, timeout); + return this; + } } Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java?rev=1416393&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java Mon Dec 3 09:46:05 2012 @@ -0,0 +1,329 @@ +package org.apache.bookkeeper.client; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.junit.*; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Enumeration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.CountDownLatch; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.test.BaseTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This unit test tests ledger fencing; + * + */ +public class TestSpeculativeRead extends BaseTestCase { + static Logger LOG = LoggerFactory.getLogger(TestSpeculativeRead.class); + + DigestType digestType; + byte[] passwd = "specPW".getBytes(); + + public TestSpeculativeRead(DigestType digestType) { + super(10); + this.digestType = digestType; + } + + long getLedgerToRead(int ensemble, int quorum) throws Exception { + byte[] data = "Data for test".getBytes(); + LedgerHandle l = bkc.createLedger(ensemble, quorum, digestType, passwd); + for (int i = 0; i < 10; i++) { + l.addEntry(data); + } + l.close(); + + return l.getId(); + } + + BookKeeper createClient(int specTimeout) throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setSpeculativeReadTimeout(specTimeout) + .setReadTimeout(30000); + conf.setZkServers(zkUtil.getZooKeeperConnectString()); + return new BookKeeper(conf); + } + + class LatchCallback implements ReadCallback { + CountDownLatch l = new CountDownLatch(1); + boolean success = false; + long startMillis = System.currentTimeMillis(); + long endMillis = Long.MAX_VALUE; + + public void readComplete(int rc, + LedgerHandle lh, + Enumeration seq, + Object ctx) { + endMillis = System.currentTimeMillis(); + LOG.debug("Got response {} {}", rc, getDuration()); + success = rc == BKException.Code.OK; + l.countDown(); + } + + long getDuration() { + return endMillis - startMillis; + } + + void expectSuccess(int milliseconds) throws Exception { + assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS)); + assertTrue(success); + } + + void expectFail(int milliseconds) throws Exception { + assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS)); + assertFalse(success); + } + + void expectTimeout(int milliseconds) throws Exception { + assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS)); + } + } + + /** + * Test basic speculative functionallity. + * - Create 2 clients with read timeout disabled, one with spec + * read enabled, the other not. + * - create ledger + * - sleep second bookie in ensemble + * - read first entry, both should find on first bookie. + * - read second bookie, spec client should find on bookie three, + * non spec client should hang. + */ + @Test + public void testSpeculativeRead() throws Exception { + long id = getLedgerToRead(3,2); + BookKeeper bknospec = createClient(0); // disabled + BookKeeper bkspec = createClient(2000); + + LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd); + LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd); + + // sleep second bookie + CountDownLatch sleepLatch = new CountDownLatch(1); + InetSocketAddress second = lnospec.getLedgerMetadata().getEnsembles().get(0L).get(1); + sleepBookie(second, sleepLatch); + + try { + // read first entry, both go to first bookie, should be fine + LatchCallback nospeccb = new LatchCallback(); + LatchCallback speccb = new LatchCallback(); + lnospec.asyncReadEntries(0, 0, nospeccb, null); + lspec.asyncReadEntries(0, 0, speccb, null); + nospeccb.expectSuccess(2000); + speccb.expectSuccess(2000); + + // read second entry, both look for second book, spec read client + // tries third bookie, nonspec client hangs as read timeout is very long. + nospeccb = new LatchCallback(); + speccb = new LatchCallback(); + lnospec.asyncReadEntries(1, 1, nospeccb, null); + lspec.asyncReadEntries(1, 1, speccb, null); + speccb.expectSuccess(4000); + nospeccb.expectTimeout(4000); + } finally { + sleepLatch.countDown(); + lspec.close(); + lnospec.close(); + bkspec.close(); + bknospec.close(); + } + } + + /** + * Test that if more than one replica is down, we can still read, as long as the quorum + * size is larger than the number of down replicas. + */ + @Test + public void testSpeculativeReadMultipleReplicasDown() throws Exception { + long id = getLedgerToRead(5,5); + int timeout = 5000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + // sleep bookie 1, 2 & 4 + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(1), sleepLatch); + sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(2), sleepLatch); + sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(4), sleepLatch); + + try { + // read first entry, should complete faster than timeout + // as bookie 0 has the entry + LatchCallback latch0 = new LatchCallback(); + l.asyncReadEntries(0, 0, latch0, null); + latch0.expectSuccess(timeout/2); + + // second should have to hit two timeouts (bookie 1 & 2) + // bookie 3 has the entry + LatchCallback latch1 = new LatchCallback(); + l.asyncReadEntries(1, 1, latch1, null); + latch1.expectTimeout(timeout); + latch1.expectSuccess(timeout*2); + assertTrue("should have taken longer than two timeouts, but less than 3", + latch1.getDuration() > timeout*2 + && latch1.getDuration() < timeout*3); + + // third should have to hit one timeouts (bookie 2) + // bookie 3 has the entry + LatchCallback latch2 = new LatchCallback(); + l.asyncReadEntries(2, 2, latch2, null); + latch2.expectTimeout(timeout/2); + latch2.expectSuccess(timeout); + assertTrue("should have taken longer than one timeout, but less than 2", + latch2.getDuration() > timeout + && latch2.getDuration() < timeout*2); + + // fourth should have no timeout + // bookie 3 has the entry + LatchCallback latch3 = new LatchCallback(); + l.asyncReadEntries(3, 3, latch3, null); + latch3.expectSuccess(timeout/2); + + // fifth should hit one timeout, (bookie 4) + // bookie 0 has the entry + LatchCallback latch4 = new LatchCallback(); + l.asyncReadEntries(4, 4, latch4, null); + latch4.expectTimeout(timeout/2); + latch4.expectSuccess(timeout); + assertTrue("should have taken longer than one timeout, but less than 2", + latch4.getDuration() > timeout + && latch4.getDuration() < timeout*2); + + } finally { + sleepLatch.countDown(); + l.close(); + bkspec.close(); + } + } + + /** + * Test that if after a speculative read is kicked off, the original read completes + * nothing bad happens. + */ + @Test + public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception { + long id = getLedgerToRead(2,2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + // sleep bookies + CountDownLatch sleepLatch0 = new CountDownLatch(1); + CountDownLatch sleepLatch1 = new CountDownLatch(1); + sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(0), sleepLatch0); + sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(1), sleepLatch1); + + try { + // read goes to first bookie, spec read timeout occurs, + // goes to second + LatchCallback latch0 = new LatchCallback(); + l.asyncReadEntries(0, 0, latch0, null); + latch0.expectTimeout(timeout); + + // wake up first bookie + sleepLatch0.countDown(); + latch0.expectSuccess(timeout/2); + + sleepLatch1.countDown(); + + // check we can read next entry without issue + LatchCallback latch1 = new LatchCallback(); + l.asyncReadEntries(1, 1, latch1, null); + latch1.expectSuccess(timeout/2); + + } finally { + sleepLatch0.countDown(); + sleepLatch1.countDown(); + l.close(); + bkspec.close(); + } + } + + /** + * Unit test for the speculative read scheduling method + */ + @Test + public void testSpeculativeReadScheduling() throws Exception { + long id = getLedgerToRead(3,2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + ArrayList ensemble = l.getLedgerMetadata().getEnsembles().get(0L); + Set allHosts = new HashSet(ensemble); + Set noHost = new HashSet(); + Set secondHostOnly = new HashSet(); + secondHostOnly.add(ensemble.get(1)); + try { + LatchCallback latch0 = new LatchCallback(); + PendingReadOp op = new PendingReadOp(l, 0, 5, latch0, null); + + // if we've already heard from all hosts, + // we only send the initial read + PendingReadOp.LedgerEntryRequest req0 + = op.new LedgerEntryRequest(ensemble, l.getId(), 0); + assertTrue("Should have sent to first", + req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0))); + assertNull("Should not have sent another", + req0.maybeSendSpeculativeRead(allHosts)); + + // if we have heard from some hosts, but not one we have sent to + // send again + PendingReadOp.LedgerEntryRequest req2 + = op.new LedgerEntryRequest(ensemble, l.getId(), 2); + assertTrue("Should have sent to third", + req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2))); + assertTrue("Should have sent to first", + req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0))); + + // if we have heard from some hosts, which includes one we sent to + // do not read again + PendingReadOp.LedgerEntryRequest req4 + = op.new LedgerEntryRequest(ensemble, l.getId(), 4); + assertTrue("Should have sent to second", + req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1))); + assertNull("Should not have sent another", + req4.maybeSendSpeculativeRead(secondHostOnly)); + } finally { + // wait for all ops to complete + l.opCounterSem.acquire(bkspec.getConf().getThrottleValue()); + + l.close(); + bkspec.close(); + } + } +} \ No newline at end of file Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1416393&r1=1416392&r2=1416393&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Mon Dec 3 09:46:05 2012 @@ -234,7 +234,7 @@ public abstract class BookKeeperClusterT public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds) throws InterruptedException, IOException { final CountDownLatch l = new CountDownLatch(1); - final String name = "BookieJournal-" + addr.getPort(); + final String name = "NIOServerFactory-" + addr.getPort(); Thread[] allthreads = new Thread[Thread.activeCount()]; Thread.enumerate(allthreads); for (final Thread t : allthreads) { @@ -271,7 +271,7 @@ public abstract class BookKeeperClusterT */ public void sleepBookie(InetSocketAddress addr, final CountDownLatch l) throws InterruptedException, IOException { - final String name = "BookieJournal-" + addr.getPort(); + final String name = "NIOServerFactory-" + addr.getPort(); Thread[] allthreads = new Thread[Thread.activeCount()]; Thread.enumerate(allthreads); for (final Thread t : allthreads) {