zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1515769 - in /zookeeper/bookkeeper/trunk: CHANGES.txt bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
Date Tue, 20 Aug 2013 10:51:50 GMT
Author: ivank
Date: Tue Aug 20 10:51:50 2013
New Revision: 1515769

URL: http://svn.apache.org/r1515769
Log:
BOOKKEEPER-668: Race between PerChannelBookieClient#channelDisconnected() and disconnect()
calls can make clients hang while add/reading entries in case of multiple bookie failures
(sijie & ivank via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1515769&r1=1515768&r2=1515769&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Aug 20 10:51:50 2013
@@ -88,6 +88,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-667: Client write will fail with BadMetadataVersion in case of multiple
Bookie failures with AutoRecovery enabled (sijie via ivank)
 
+        BOOKKEEPER-668: Race between PerChannelBookieClient#channelDisconnected() and disconnect()
calls can make clients hang while add/reading entries in case of multiple bookie failures
(sijie & ivank via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1515769&r1=1515768&r2=1515769&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Tue Aug 20 10:51:50 2013
@@ -365,10 +365,16 @@ public class PerChannelBookieClient exte
             public void safeRun() {
 
                 ReadCompletion readCompletion = readCompletions.remove(key);
+                String bAddress = "null";
+                Channel c = channel;
+                if(c != null) {
+                    bAddress = c.getRemoteAddress().toString();
+                }
 
                 if (readCompletion != null) {
-                    LOG.error("Could not write  request for reading entry: " + key.entryId
+ " ledger-id: "
-                              + key.ledgerId + " bookie: " + channel.getRemoteAddress());
+                    LOG.error("Could not write request for reading entry: {}"
+                              + " ledger-id: {} bookie: {}",
+                              new Object[] { key.entryId, key.ledgerId, bAddress });
 
                     readCompletion.cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException,
                                                         key.ledgerId, key.entryId, null,
readCompletion.ctx);
@@ -387,10 +393,12 @@ public class PerChannelBookieClient exte
 
                 if (addCompletion != null) {
                     String bAddress = "null";
-                    if(channel != null)
-                        bAddress = channel.getRemoteAddress().toString();
-                    LOG.error("Could not write request for adding entry: " + key.entryId
+ " ledger-id: "
-                              + key.ledgerId + " bookie: " + bAddress);
+                    Channel c = channel;
+                    if(c != null) {
+                        bAddress = c.getRemoteAddress().toString();
+                    }
+                    LOG.error("Could not write request for adding entry: {} ledger-id: {}
bookie: {}",
+                              new Object[] { key.entryId, key.ledgerId, bAddress });
 
                     addCompletion.cb.writeComplete(BKException.Code.BookieHandleNotAvailableException,
key.ledgerId,
                                                    key.entryId, addr, addCompletion.ctx);
@@ -459,9 +467,8 @@ public class PerChannelBookieClient exte
         if (c != null) {
             closeChannel(c);
         }
-        if (this.channel == c) {
-            errorOutOutstandingEntries();
-        }
+
+        errorOutOutstandingEntries();
 
         synchronized (this) {
             if (this.channel == c

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java?rev=1515769&r1=1515768&r2=1515769&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
Tue Aug 20 10:51:50 2013
@@ -29,11 +29,20 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
@@ -202,9 +211,75 @@ public class TestPerChannelBookieClient 
         disconnectThread.join();
         checkThread.join();
         assertFalse("Failure in threads, check logs", shouldFail.get());
+        client.close();
+        channelFactory.releaseExternalResources();
+        executor.shutdown();
+    }
+
+    /**
+     * Test that requests are completed even if the channel is disconnected
+     * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-668}
+     */
+    @Test(timeout=60000)
+    public void testRequestCompletesAfterDisconnectRace() throws Exception {
+        ServerConfiguration conf = killBookie(0);
+
+        Bookie delayBookie = new Bookie(conf) {
+            @Override
+            public ByteBuffer readEntry(long ledgerId, long entryId)
+                    throws IOException, NoLedgerException {
+                try {
+                    Thread.sleep(3000);
+                } catch (InterruptedException ie) {
+                    throw new IOException("Interrupted waiting", ie);
+                }
+                return super.readEntry(ledgerId, entryId);
+            }
+        };
+        bsConfs.add(conf);
+        bs.add(startBookie(conf, delayBookie));
+
+        ClientSocketChannelFactory channelFactory
+            = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                                                Executors.newCachedThreadPool());
+        final OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+        InetSocketAddress addr = getBookie(0);
+        AtomicLong bytesOutstanding = new AtomicLong(0);
+
+        final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
+                                                                         addr, bytesOutstanding);
+        final CountDownLatch completion = new CountDownLatch(1);
+        final ReadEntryCallback cb = new ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(int rc, long ledgerId, long entryId,
+                                              ChannelBuffer buffer, Object ctx) {
+                    completion.countDown();
+                }
+            };
 
+        client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+            @Override
+            public void operationComplete(final int rc, Void result) {
+                if (rc != BKException.Code.OK) {
+                    executor.submitOrdered(1, new SafeRunnable() {
+                        @Override
+                        public void safeRun() {
+                            cb.readEntryComplete(rc, 1, 1, null, null);
+                        }
+                    });
+                    return;
+                }
+
+                client.readEntryAndFenceLedger(1, "00000111112222233333".getBytes(), 1, cb,
null);
+            }
+        });
+
+        Thread.sleep(1000);
+        client.disconnect();
         client.close();
         channelFactory.releaseExternalResources();
         executor.shutdown();
+
+        assertTrue("Request should have completed", completion.await(5, TimeUnit.SECONDS));
     }
 }



Mime
View raw message