bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-1061: BookieWatcher should not do ZK blocking operations from ZK async callback thread
Date Mon, 15 May 2017 19:35:34 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master d86351371 -> 5d43260e8


BOOKKEEPER-1061: BookieWatcher should not do ZK blocking operations from ZK async callback
thread

In some cases, the BookieWatcher can get the ZK event thread stuck. This happens when a ZK
blocking request is issued from a ZK callback thread.

We should decouple the blocking requests in a separate executor to avoid deadlocking ZK client.

Author: Matteo Merli <mmerli@apache.org>

Reviewers: Jia Zhai <None>, Sijie Guo <sijie@apache.org>

Closes #149 from merlimat/bookie-watcher-thread


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/5d43260e
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/5d43260e
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/5d43260e

Branch: refs/heads/master
Commit: 5d43260e84c6121df72c0ee4c844651bfb726638
Parents: d863513
Author: Matteo Merli <mmerli@apache.org>
Authored: Mon May 15 12:35:26 2017 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Mon May 15 12:35:26 2017 -0700

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BookieWatcher.java | 29 +++++++++++---------
 1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5d43260e/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index ae0ac50..e15f49a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -183,14 +185,18 @@ class BookieWatcher implements Watcher, ChildrenCallback {
 
         HashSet<BookieSocketAddress> newBookieAddrs = convertToBookieAddresses(children);
 
-        synchronized (this) {
-            Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
-            placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
-            if (bk.conf.getDiskWeightBasedPlacementEnabled()) {
-                // start collecting bookieInfo for the newly joined bookies, if any
-                bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs);
+        // Update watcher outside ZK callback thread, to avoid deadlock in case some other
+        // component is trying to do a blocking ZK operation
+        bk.mainWorkerPool.submitOrdered(path, safeRun(() -> {
+            synchronized (BookieWatcher.this) {
+                Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
+                placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
+                if (bk.conf.getDiskWeightBasedPlacementEnabled()) {
+                    // start collecting bookieInfo for the newly joined bookies, if any
+                    bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs);
+                }
             }
-        }
+        }));
 
         // we don't need to close clients here, because:
         // a. the dead bookies will be removed from topology, which will not be used in new
ensemble.
@@ -236,13 +242,10 @@ class BookieWatcher implements Watcher, ChildrenCallback {
         final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
         readBookies(new ChildrenCallback() {
             public void processResult(int rc, String path, Object ctx, List<String>
children) {
-                try {
+                bk.mainWorkerPool.submitOrdered(path, safeRun(() -> {
                     BookieWatcher.this.processResult(rc, path, ctx, children);
-                    queue.put(rc);
-                } catch (InterruptedException e) {
-                    logger.error("Interruped when trying to read bookies in a blocking fashion");
-                    throw new RuntimeException(e);
-                }
+                    queue.add(rc);
+                }));
             }
         });
         int rc = queue.take();


Mime
View raw message