zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject svn commit: r1346247 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Date Tue, 05 Jun 2012 06:00:20 GMT
Author: sijie
Date: Tue Jun  5 06:00:19 2012
New Revision: 1346247

URL: http://svn.apache.org/viewvc?rev=1346247&view=rev
Log:
BOOKKEEPER-281: BKClient is failing when zkclient connection delays (ivank via sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
  (with props)
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1346247&r1=1346246&r2=1346247&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Jun  5 06:00:19 2012
@@ -134,6 +134,8 @@ Release 4.1.0 - 2012-05-31
 
         BOOKKEEPER-273: LedgerHandle.deleteLedger() should be idempotent (Matteo Merli via
ivank)
 
+        BOOKKEEPER-281: BKClient is failing when zkclient connection delays (ivank via sijie)
+
       hedwig-client/
 
         BOOKKEEPER-217: NPE in hedwig client when enable DEBUG (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1346247&r1=1346246&r2=1346247&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
Tue Jun  5 06:00:19 2012
@@ -119,27 +119,35 @@ public class BookKeeper {
      * @throws InterruptedException
      * @throws KeeperException
      */
-    public BookKeeper(ClientConfiguration conf)
+    public BookKeeper(final ClientConfiguration conf)
             throws IOException, InterruptedException, KeeperException {
         this.conf = conf;
+
+        final CountDownLatch zkConnectLatch = new CountDownLatch(1);
         this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(),
                 new Watcher() {
                     @Override
                     public void process(WatchedEvent event) {
-                        if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected))
{
-                            connectLatch.countDown();
-                        }
+                        // countdown the latch on all events, even if we haven't
+                        // successfully connected.
+                        zkConnectLatch.countDown();
+
                         // TODO: handle session disconnects and expires
                         LOG.debug("Process: {} {}", event.getType(), event.getPath());
                     }
                 });
+        if (!zkConnectLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)
+            || !zk.getState().isConnected()) {
+            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+        }
+
         this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                                                                 Executors.newCachedThreadPool());
-        bookieWatcher = new BookieWatcher(conf, this);
-        bookieWatcher.readBookiesBlocking();
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        // initialize ledger meta manager
+        bookieWatcher = new BookieWatcher(conf, this);
+        bookieWatcher.readBookiesBlocking();
+
         ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
 
         ownChannelFactory = true;
@@ -176,49 +184,33 @@ public class BookKeeper {
      *          {@link ClientConfiguration}
      * @param zk
      *          Zookeeper client instance connected to the zookeeper with which
-     *          the bookies have registered
+     *          the bookies have registered. The ZooKeeper client must be connected
+     *          before it is passed to BookKeeper. Otherwise a KeeperException is thrown.
      * @param channelFactory
      *          A factory that will be used to create connections to the bookies
      * @throws IOException
      * @throws InterruptedException
-     * @throws KeeperException
+     * @throws KeeperException if the passed zk handle is not connected
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory
channelFactory)
             throws IOException, InterruptedException, KeeperException {
         if (zk == null || channelFactory == null) {
             throw new NullPointerException();
         }
+        if (!zk.getState().isConnected()) {
+            LOG.error("Unconnected zookeeper handle passed to bookkeeper");
+            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+        }
         this.conf = conf;
         this.zk = zk;
         this.channelFactory = channelFactory;
-        bookieWatcher = new BookieWatcher(conf, this);
-        bookieWatcher.readBookiesBlocking();
+
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        // initialize ledger meta manager
-        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
-    }
+        bookieWatcher = new BookieWatcher(conf, this);
+        bookieWatcher.readBookiesBlocking();
 
-    void withZKConnected(final ZKConnectCallback cb) {
-        if (ownZKHandle) {
-            mainWorkerPool.submit(new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        try {
-                            if (!connectLatch.await(zkConnectTimeoutMs, TimeUnit.MILLISECONDS))
{
-                                cb.connectionFailed(BKException.Code.ZKException);
-                            } else {
-                                cb.connected();
-                            }
-                        } catch (InterruptedException ie) {
-                            // someone trying to kill the process
-                            cb.connectionFailed(BKException.Code.InterruptedException);
-                        }
-                    }
-                });
-        } else {
-            cb.connected();
-        }
+        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
     }
 
     LedgerManager getLedgerManager() {
@@ -278,15 +270,8 @@ public class BookKeeper {
      */
     public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType,
                                   final byte[] passwd, final CreateCallback cb, final Object
ctx) {
-        withZKConnected(new ZKConnectCallback() {
-                public void connected() {
-                    new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd,
cb, ctx)
-                        .initiate();
-                }
-                public void connectionFailed(int code) {
-                    cb.createComplete(code, null, ctx);
-                }
-            });
+        new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
+            .initiate();
     }
 
 
@@ -370,14 +355,7 @@ public class BookKeeper {
      */
     public void asyncOpenLedger(final long lId, final DigestType digestType, final byte passwd[],
                                 final OpenCallback cb, final Object ctx) {
-        withZKConnected(new ZKConnectCallback() {
-                public void connected() {
-                    new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
-                }
-                public void connectionFailed(int code) {
-                    cb.openComplete(code, null, ctx);
-                }
-            });
+        new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
     }
 
     /**
@@ -409,14 +387,7 @@ public class BookKeeper {
      */
     public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final
byte passwd[],
                                           final OpenCallback cb, final Object ctx) {
-        withZKConnected(new ZKConnectCallback() {
-                public void connected() {
-                    new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
-                }
-                public void connectionFailed(int code) {
-                    cb.openComplete(code, null, ctx);
-                }
-            });
+        new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
     }
 
 
@@ -502,14 +473,7 @@ public class BookKeeper {
      *            optional control object
      */
     public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx)
{
-        withZKConnected(new ZKConnectCallback() {
-                public void connected() {
-                    new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
-                }
-                public void connectionFailed(int code) {
-                    cb.deleteComplete(code, ctx);
-                }
-            });
+        new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
     }
 
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1346247&r1=1346246&r2=1346247&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
Tue Jun  5 06:00:19 2012
@@ -30,6 +30,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -127,18 +129,25 @@ public class BookKeeperAdmin {
      */
     public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException,
KeeperException {
         // Create the ZooKeeper client instance
+        final CountDownLatch latch = new CountDownLatch(1);
         zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() {
             @Override
             public void process(WatchedEvent event) {
+                latch.countDown();
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Process: " + event.getType() + " " + event.getPath());
                 }
             }
         });
+        if (!latch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)
+            || !zk.getState().isConnected()) {
+            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+        }
         // Create the bookie path
         bookiesPath = conf.getZkAvailableBookiesPath();
         // Create the BookKeeper client instance
-        bkc = new BookKeeper(conf);
+        bkc = new BookKeeper(conf, zk);
+
         DIGEST_TYPE = conf.getBookieRecoveryDigestType();
         PASSWD = conf.getBookieRecoveryPasswd();
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1346247&r1=1346246&r2=1346247&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
Tue Jun  5 06:00:19 2012
@@ -115,10 +115,10 @@ class BookieWatcher implements Watcher, 
             newBookieAddrs.add(bookieAddr);
         }
 
-        HashSet<InetSocketAddress> deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
-        deadBookies.removeAll(newBookieAddrs);
-
+        final HashSet<InetSocketAddress> deadBookies;
         synchronized (this) {
+            deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
+            deadBookies.removeAll(newBookieAddrs);
             knownBookies = newBookieAddrs;
         }
 

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java?rev=1346247&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
Tue Jun  5 06:00:19 2012
@@ -0,0 +1,88 @@
+package org.apache.bookkeeper.client;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests of the main BookKeeper client
+ */
+public class BookKeeperTest extends BookKeeperClusterTestCase {
+    public BookKeeperTest() {
+        super(4);
+    }
+
+    @Test
+    public void testConstructionZkDelay() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setZkTimeout(20000);
+
+        CountDownLatch l = new CountDownLatch(1);
+        zkUtil.sleepServer(5, l);
+        l.await();
+
+        BookKeeper bkc = new BookKeeper(conf);
+        bkc.createLedger(DigestType.CRC32, "testPasswd".getBytes()).close();
+        bkc.close();
+    }
+
+    @Test
+    public void testConstructionNotConnectedExplicitZk() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setZkTimeout(20000);
+
+        CountDownLatch l = new CountDownLatch(1);
+        zkUtil.sleepServer(5, l);
+        l.await();
+
+        ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 10000,
+                            new Watcher() {
+                                @Override
+                                public void process(WatchedEvent event) {
+                                }
+                            });
+        assertFalse("ZK shouldn't have connected yet", zk.getState().isConnected());
+        try {
+            BookKeeper bkc = new BookKeeper(conf, zk);
+            fail("Shouldn't be able to construct with unconnected zk");
+        } catch (KeeperException.ConnectionLossException cle) {
+            // correct behaviour
+        }
+    }
+}
\ No newline at end of file

Propchange: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java?rev=1346247&r1=1346246&r2=1346247&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
Tue Jun  5 06:00:19 2012
@@ -88,7 +88,7 @@ public class TestReadTimeout extends Boo
                         completed.set(true);
                     }
                 }, null);
-        Thread.sleep((baseClientConf.getReadTimeout()*2)*1000);
+        Thread.sleep((baseClientConf.getReadTimeout()*3)*1000);
         Assert.assertTrue("Write request did not finish", completed.get());
 
         Set<InetSocketAddress> afterSet = new HashSet<InetSocketAddress>();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1346247&r1=1346246&r2=1346247&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
Tue Jun  5 06:00:19 2012
@@ -22,6 +22,8 @@
 package org.apache.bookkeeper.test;
 
 import java.io.File;
+import java.io.IOException;
+
 import java.net.InetSocketAddress;
 
 import org.apache.commons.io.FileUtils;
@@ -115,6 +117,31 @@ public class ZooKeeperUtil {
         zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
+    public void sleepServer(final int seconds, final CountDownLatch l)
+            throws InterruptedException, IOException {
+        Thread[] allthreads = new Thread[Thread.activeCount()];
+        Thread.enumerate(allthreads);
+        for (final Thread t : allthreads) {
+            if (t.getName().contains("SyncThread:0")) {
+                Thread sleeper = new Thread() {
+                    public void run() {
+                        try {
+                            t.suspend();
+                            l.countDown();
+                            Thread.sleep(seconds*1000);
+                            t.resume();
+                        } catch (Exception e) {
+                            LOG.error("Error suspending thread", e);
+                        }
+                    }
+                };
+                sleeper.start();
+                return;
+            }
+        }
+        throw new IOException("ZooKeeper thread not found");
+    }
+
     public void killServer() throws Exception {
         if (zkc != null) {
             zkc.close();



Mime
View raw message