Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 432519EDA for ; Thu, 7 Jun 2012 06:38:14 +0000 (UTC) Received: (qmail 2574 invoked by uid 500); 7 Jun 2012 06:38:14 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 2526 invoked by uid 500); 7 Jun 2012 06:38:12 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 2489 invoked by uid 99); 7 Jun 2012 06:38:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jun 2012 06:38:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jun 2012 06:38:08 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 741FA23889E0 for ; Thu, 7 Jun 2012 06:37:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1347417 - in /zookeeper/bookkeeper/tags/release-4.1.0: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/uti... Date: Thu, 07 Jun 2012 06:37:47 -0000 To: commits@zookeeper.apache.org From: sijie@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120607063748.741FA23889E0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sijie Date: Thu Jun 7 06:37:46 2012 New Revision: 1347417 URL: http://svn.apache.org/viewvc?rev=1347417&view=rev Log: svn merge -c 1346253,1346258,1346328,1346343,1346976 https://svn.apache.org/repos/asf/zookeeper/bookkeeper/branches/branch-4.1 Added: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java - copied unchanged from r1346253, zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java Modified: zookeeper/bookkeeper/tags/release-4.1.0/ (props changed) zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java zookeeper/bookkeeper/tags/release-4.1.0/pom.xml Propchange: zookeeper/bookkeeper/tags/release-4.1.0/ ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Jun 7 06:37:46 2012 @@ -0,0 +1 @@ +/zookeeper/bookkeeper/branches/branch-4.1:1346253,1346258,1346328,1346343,1346976 Modified: zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt Thu Jun 7 06:37:46 2012 @@ -120,6 +120,14 @@ Release 4.1.0 - 2012-05-31 BOOKKEEPER-273: LedgerHandle.deleteLedger() should be idempotent (Matteo Merli via ivank) + BOOKKEEPER-281: BKClient is failing when zkclient connection delays (ivank via sijie) + + BOOKKEEPER-279: LocalBookKeeper is failing intermittently due to zkclient connection establishment delay (Rakesh R via sijie) + + BOOKKEEPER-286: Compilation warning (ivank via sijie) + + BOOKKEEPER-287: NoSuchElementException in LedgerCacheImpl (sijie) + hedwig-client/ BOOKKEEPER-217: NPE in hedwig client when enable DEBUG (sijie via ivank) @@ -146,6 +154,8 @@ Release 4.1.0 - 2012-05-31 BOOKKEEPER-146: TestConcurrentTopicAcquisition sometimes hangs (ivank) + BOOKKEEPER-285: TestZkSubscriptionManager quits due to NPE, so other tests are not run in hedwig server. (sijie) + bookkeeper-benchmark/ BOOKKEEPER-207: BenchBookie doesn't run correctly (ivank via fpj) Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Thu Jun 7 06:37:46 2012 @@ -176,7 +176,7 @@ public class LedgerCacheImpl implements } } catch (IOException ie) { // if we grab a clean page, but failed to update the page - // we are exhuasting the count of ledger entry pages. + // we are exhausting the count of ledger entry pages. // since this page will be never used, so we need to decrement // page count of ledger cache. lep.releasePage(); @@ -317,7 +317,7 @@ public class LedgerCacheImpl implements if (!doAll) { break; } - // Yeild. if we are doing all the ledgers we don't want to block other flushes that + // Yield. if we are doing all the ledgers we don't want to block other flushes that // need to happen try { dirtyLedgers.wait(1); @@ -449,22 +449,22 @@ public class LedgerCacheImpl implements if (entry % entriesPerPage != 0) { throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage); } - synchronized(this) { - if (pageCount < pageLimit) { - // let's see if we can allocate something - LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage); - lep.setLedger(ledger); - lep.setFirstEntry(entry); - - // note, this will not block since it is a new page - lep.usePage(); - pageCount++; - return lep; - } - } - outerLoop: while(true) { + synchronized(this) { + if (pageCount < pageLimit) { + // let's see if we can allocate something + LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage); + lep.setLedger(ledger); + lep.setFirstEntry(entry); + + // note, this will not block since it is a new page + lep.usePage(); + pageCount++; + return lep; + } + } + synchronized(cleanLedgers) { if (cleanLedgers.isEmpty()) { flushLedger(false); @@ -475,6 +475,14 @@ public class LedgerCacheImpl implements } } synchronized(this) { + // if ledgers deleted between checking pageCount and putting + // ledgers into cleanLedgers list, the cleanLedgers list would be empty. + // so give it a chance to go back to check pageCount again because + // deleteLedger would decrement pageCount to return the number of pages + // occupied by deleted ledgers. + if (cleanLedgers.isEmpty()) { + continue outerLoop; + } Long cleanLedger = cleanLedgers.getFirst(); Map map = pages.get(cleanLedger); while (map == null || map.isEmpty()) { @@ -610,7 +618,13 @@ public class LedgerCacheImpl implements // remove pages first to avoid page flushed when deleting file info synchronized(this) { - pages.remove(ledgerId); + Map lpages = pages.remove(ledgerId); + if (null != lpages) { + pageCount -= lpages.size(); + if (pageCount < 0) { + LOG.error("Page count of ledger cache has been decremented to be less than zero."); + } + } } // Delete the ledger's index file and close the FileInfo FileInfo fi = null; Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Thu Jun 7 06:37:46 2012 @@ -119,27 +119,35 @@ public class BookKeeper { * @throws InterruptedException * @throws KeeperException */ - public BookKeeper(ClientConfiguration conf) + public BookKeeper(final ClientConfiguration conf) throws IOException, InterruptedException, KeeperException { this.conf = conf; + + final CountDownLatch zkConnectLatch = new CountDownLatch(1); this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() { @Override public void process(WatchedEvent event) { - if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) { - connectLatch.countDown(); - } + // countdown the latch on all events, even if we haven't + // successfully connected. + zkConnectLatch.countDown(); + // TODO: handle session disconnects and expires LOG.debug("Process: {} {}", event.getType(), event.getPath()); } }); + if (!zkConnectLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS) + || !zk.getState().isConnected()) { + throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); + } + this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - bookieWatcher = new BookieWatcher(conf, this); - bookieWatcher.readBookiesBlocking(); mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads()); bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool); - // initialize ledger meta manager + bookieWatcher = new BookieWatcher(conf, this); + bookieWatcher.readBookiesBlocking(); + ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk); ownChannelFactory = true; @@ -176,49 +184,33 @@ public class BookKeeper { * {@link ClientConfiguration} * @param zk * Zookeeper client instance connected to the zookeeper with which - * the bookies have registered + * the bookies have registered. The ZooKeeper client must be connected + * before it is passed to BookKeeper. Otherwise a KeeperException is thrown. * @param channelFactory * A factory that will be used to create connections to the bookies * @throws IOException * @throws InterruptedException - * @throws KeeperException + * @throws KeeperException if the passed zk handle is not connected */ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory) throws IOException, InterruptedException, KeeperException { if (zk == null || channelFactory == null) { throw new NullPointerException(); } + if (!zk.getState().isConnected()) { + LOG.error("Unconnected zookeeper handle passed to bookkeeper"); + throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); + } this.conf = conf; this.zk = zk; this.channelFactory = channelFactory; - bookieWatcher = new BookieWatcher(conf, this); - bookieWatcher.readBookiesBlocking(); + mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads()); bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool); - // initialize ledger meta manager - ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk); - } + bookieWatcher = new BookieWatcher(conf, this); + bookieWatcher.readBookiesBlocking(); - void withZKConnected(final ZKConnectCallback cb) { - if (ownZKHandle) { - mainWorkerPool.submit(new SafeRunnable() { - @Override - public void safeRun() { - try { - if (!connectLatch.await(zkConnectTimeoutMs, TimeUnit.MILLISECONDS)) { - cb.connectionFailed(BKException.Code.ZKException); - } else { - cb.connected(); - } - } catch (InterruptedException ie) { - // someone trying to kill the process - cb.connectionFailed(BKException.Code.InterruptedException); - } - } - }); - } else { - cb.connected(); - } + ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk); } LedgerManager getLedgerManager() { @@ -278,15 +270,8 @@ public class BookKeeper { */ public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType, final byte[] passwd, final CreateCallback cb, final Object ctx) { - withZKConnected(new ZKConnectCallback() { - public void connected() { - new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx) - .initiate(); - } - public void connectionFailed(int code) { - cb.createComplete(code, null, ctx); - } - }); + new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx) + .initiate(); } @@ -370,14 +355,7 @@ public class BookKeeper { */ public void asyncOpenLedger(final long lId, final DigestType digestType, final byte passwd[], final OpenCallback cb, final Object ctx) { - withZKConnected(new ZKConnectCallback() { - public void connected() { - new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate(); - } - public void connectionFailed(int code) { - cb.openComplete(code, null, ctx); - } - }); + new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate(); } /** @@ -409,14 +387,7 @@ public class BookKeeper { */ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final byte passwd[], final OpenCallback cb, final Object ctx) { - withZKConnected(new ZKConnectCallback() { - public void connected() { - new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery(); - } - public void connectionFailed(int code) { - cb.openComplete(code, null, ctx); - } - }); + new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery(); } @@ -502,14 +473,7 @@ public class BookKeeper { * optional control object */ public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx) { - withZKConnected(new ZKConnectCallback() { - public void connected() { - new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate(); - } - public void connectionFailed(int code) { - cb.deleteComplete(code, ctx); - } - }); + new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate(); } Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Thu Jun 7 06:37:46 2012 @@ -30,6 +30,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -127,18 +129,25 @@ public class BookKeeperAdmin { */ public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException { // Create the ZooKeeper client instance + final CountDownLatch latch = new CountDownLatch(1); zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() { @Override public void process(WatchedEvent event) { + latch.countDown(); if (LOG.isDebugEnabled()) { LOG.debug("Process: " + event.getType() + " " + event.getPath()); } } }); + if (!latch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS) + || !zk.getState().isConnected()) { + throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); + } // Create the bookie path bookiesPath = conf.getZkAvailableBookiesPath(); // Create the BookKeeper client instance - bkc = new BookKeeper(conf); + bkc = new BookKeeper(conf, zk); + DIGEST_TYPE = conf.getBookieRecoveryDigestType(); PASSWD = conf.getBookieRecoveryPasswd(); } Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Thu Jun 7 06:37:46 2012 @@ -115,10 +115,10 @@ class BookieWatcher implements Watcher, newBookieAddrs.add(bookieAddr); } - HashSet deadBookies = (HashSet)knownBookies.clone(); - deadBookies.removeAll(newBookieAddrs); - + final HashSet deadBookies; synchronized (this) { + deadBookies = (HashSet)knownBookies.clone(); + deadBookies.removeAll(newBookieAddrs); knownBookies = newBookieAddrs; } Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java Thu Jun 7 06:37:46 2012 @@ -26,23 +26,20 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookieServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; @@ -68,6 +65,7 @@ public class LocalBookKeeper { ZooKeeperServer zks; ZooKeeper zkc; int ZooKeeperDefaultPort = 2181; + static int zkSessionTimeOut = 5000; File ZkTmpDir; //BookKeeper variables @@ -104,11 +102,14 @@ public class LocalBookKeeper { LOG.debug("ZooKeeper server up: " + b); } - private void initializeZookeper() { + private void initializeZookeper() throws IOException { LOG.info("Instantiate ZK Client"); //initialize the zk client with values try { - zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher()); + ZKConnectionWatcher zkConnectionWatcher = new ZKConnectionWatcher(); + zkc = new ZooKeeper(HOSTPORT, zkSessionTimeOut, + zkConnectionWatcher); + zkConnectionWatcher.waitForConnection(); zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // No need to create an entry for each requested bookie anymore as the @@ -119,9 +120,6 @@ public class LocalBookKeeper { } catch (InterruptedException e) { // TODO Auto-generated catch block LOG.error("Interrupted while creating znodes", e); - } catch (IOException e) { - // TODO Auto-generated catch block - LOG.error("Exception while creating znodes", e); } } private void runBookies(ServerConfiguration baseConf) @@ -184,9 +182,30 @@ public class LocalBookKeeper { System.err.println("Usage: LocalBookKeeper number-of-bookies"); } - /* User for testing purposes, void */ - static class emptyWatcher implements Watcher { - public void process(WatchedEvent event) {} + /* Watching SyncConnected event from ZooKeeper */ + static class ZKConnectionWatcher implements Watcher { + private CountDownLatch clientConnectLatch = new CountDownLatch(1); + + @Override + public void process(WatchedEvent event) { + if (event.getState() == KeeperState.SyncConnected) { + clientConnectLatch.countDown(); + } + } + + // Waiting for the SyncConnected event from the ZooKeeper server + public void waitForConnection() throws IOException { + try { + if (!clientConnectLatch.await(zkSessionTimeOut, + TimeUnit.MILLISECONDS)) { + throw new IOException( + "Couldn't connect to zookeeper server"); + } + } catch (InterruptedException e) { + throw new IOException( + "Interrupted when connecting to zookeeper server", e); + } + } } public static boolean waitForServerUp(String hp, long timeout) { Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Thu Jun 7 06:37:46 2012 @@ -126,6 +126,41 @@ public class LedgerCacheTest extends Tes } @Test + public void testDeleteLedger() throws Exception { + int numEntries = 10; + // limit open files & pages + conf.setOpenFileLimit(999).setPageLimit(2) + .setPageSize(8 * numEntries); + // create ledger cache + newLedgerCache(); + try { + int numLedgers = 2; + byte[] masterKey = "blah".getBytes(); + for (int i=1; i<=numLedgers; i++) { + ledgerCache.setMasterKey((long)i, masterKey); + for (int j=0; j afterSet = new HashSet(); Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Thu Jun 7 06:37:46 2012 @@ -22,6 +22,8 @@ package org.apache.bookkeeper.test; import java.io.File; +import java.io.IOException; + import java.net.InetSocketAddress; import org.apache.commons.io.FileUtils; @@ -115,6 +117,31 @@ public class ZooKeeperUtil { zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } + public void sleepServer(final int seconds, final CountDownLatch l) + throws InterruptedException, IOException { + Thread[] allthreads = new Thread[Thread.activeCount()]; + Thread.enumerate(allthreads); + for (final Thread t : allthreads) { + if (t.getName().contains("SyncThread:0")) { + Thread sleeper = new Thread() { + public void run() { + try { + t.suspend(); + l.countDown(); + Thread.sleep(seconds*1000); + t.resume(); + } catch (Exception e) { + LOG.error("Error suspending thread", e); + } + } + }; + sleeper.start(); + return; + } + } + throw new IOException("ZooKeeper thread not found"); + } + public void killServer() throws Exception { if (zkc != null) { zkc.close(); Modified: zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Thu Jun 7 06:37:46 2012 @@ -31,6 +31,7 @@ import org.apache.hedwig.protocol.PubSub import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; import org.apache.hedwig.server.common.ServerConfiguration; import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager; +import org.apache.hedwig.server.persistence.LocalDBPersistenceManager; import org.apache.hedwig.util.ConcurrencyUtils; import org.apache.hedwig.util.Either; import org.apache.hedwig.util.Callback; @@ -50,7 +51,8 @@ public class TestZkSubscriptionManager e super.setUp(); cfg = new ServerConfiguration(); final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler), null, cfg, scheduler); + sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler), + LocalDBPersistenceManager.instance(), cfg, scheduler); msgIdCallback = new Callback() { @Override public void operationFailed(Object ctx, final PubSubException exception) { Modified: zookeeper/bookkeeper/tags/release-4.1.0/pom.xml URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/pom.xml?rev=1347417&r1=1347416&r2=1347417&view=diff ============================================================================== --- zookeeper/bookkeeper/tags/release-4.1.0/pom.xml (original) +++ zookeeper/bookkeeper/tags/release-4.1.0/pom.xml Thu Jun 7 06:37:46 2012 @@ -45,6 +45,11 @@ + org.codehaus.mojo + findbugs-maven-plugin + 2.3.2 + + maven-compiler-plugin 2.3.2 @@ -122,11 +127,6 @@ 2.1 - org.codehaus.mojo - findbugs-maven-plugin - 2.1 - - org.apache.maven.plugins maven-pmd-plugin 2.3