distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject incubator-distributedlog git commit: DL-88: remove watches when unregister children watches
Date Sat, 17 Dec 2016 06:50:33 GMT
Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master b4150fc84 -> a1c15f8ab


DL-88: remove watches when unregister children watches

merge twitter's change from Sijie Guo

Author: Sijie Guo <sijieg@twitter.com>
Author: Jordan Bull <jbull@twitter.com>
Author: Leigh Stewart <lstewart@twitter.com>

Reviewers: Leigh Stewart <lstewart@apache.org>

Closes #60 from sijie/merge/DL-88


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a1c15f8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a1c15f8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a1c15f8a

Branch: refs/heads/master
Commit: a1c15f8ab014417e67bd876ccb338883a9d112c8
Parents: b4150fc
Author: Sijie Guo <sijieg@twitter.com>
Authored: Fri Dec 16 22:50:41 2016 -0800
Committer: Sijie Guo <sijie@apache.org>
Committed: Fri Dec 16 22:50:41 2016 -0800

----------------------------------------------------------------------
 .../twitter/distributedlog/BKLogHandler.java    |  7 ----
 .../distributedlog/BKLogReadHandler.java        |  7 +++-
 .../distributedlog/BKLogWriteHandler.java       |  6 +++-
 .../twitter/distributedlog/ZooKeeperClient.java |  1 +
 .../readahead/ReadAheadWorker.java              |  2 +-
 .../distributedlog/zk/ZKWatcherManager.java     | 36 ++++++++++++++++++--
 .../distributedlog/zk/TestZKWatcherManager.java |  3 +-
 7 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 9aa3465..a6ec318 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -702,13 +702,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable,
AsyncAbor
     }
 
     @Override
-    public Future<Void> asyncClose() {
-        // No-op
-        this.zooKeeperClient.getWatcherManager().unregisterChildWatcher(logMetadata.getLogSegmentsPath(),
this);
-        return Future.Void();
-    }
-
-    @Override
     public Future<Void> asyncAbort() {
         return asyncClose();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 0bf6b84..6a8f90e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -311,7 +311,12 @@ class BKLogReadHandler extends BKLogHandler {
                 if (null != handleCache) {
                     handleCache.clear();
                 }
-                return BKLogReadHandler.super.asyncClose();
+                // No-op
+                zooKeeperClient.getWatcherManager().unregisterChildWatcher(
+                        logMetadata.getLogSegmentsPath(),
+                        BKLogReadHandler.this,
+                        true);
+                return Future.Void();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 573679a..4665ed5 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -1263,7 +1263,11 @@ class BKLogWriteHandler extends BKLogHandler {
         ).flatMap(new AbstractFunction1<Void, Future<Void>>() {
             @Override
             public Future<Void> apply(Void result) {
-                return BKLogWriteHandler.super.asyncClose();
+                zooKeeperClient.getWatcherManager().unregisterChildWatcher(
+                        logMetadata.getLogSegmentsPath(),
+                        BKLogWriteHandler.this,
+                        false);
+                return Future.Void();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
index 912d592..9ea9e37 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
@@ -169,6 +169,7 @@ public class ZooKeeperClient {
         this.credentials = credentials;
         this.watcherManager = ZKWatcherManager.newBuilder()
                 .name(name)
+                .zkc(this)
                 .statsLogger(statsLogger.scope("watcher_manager"))
                 .build();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index a3fd239..5c15009 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -359,7 +359,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher,
As
         running = false;
 
         this.zkc.getWatcherManager()
-                .unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), this);
+                .unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), this, true);
 
         // Aside from unfortunate naming of variables, this allows
         // the currently active long poll to be interrupted and completed

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
index 4068737..03b2841 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
@@ -17,8 +17,11 @@
  */
 package com.twitter.distributedlog.zk;
 
+import com.twitter.distributedlog.ZooKeeperClient;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -50,31 +53,40 @@ public class ZKWatcherManager implements Watcher {
 
         private String _name;
         private StatsLogger _statsLogger;
+        private ZooKeeperClient _zkc;
 
         public Builder name(String name) {
             this._name = name;
             return this;
         }
 
+        public Builder zkc(ZooKeeperClient zkc) {
+            this._zkc = zkc;
+            return this;
+        }
+
         public Builder statsLogger(StatsLogger statsLogger) {
             this._statsLogger = statsLogger;
             return this;
         }
 
         public ZKWatcherManager build() {
-            return new ZKWatcherManager(_name, _statsLogger);
+            return new ZKWatcherManager(_name, _zkc, _statsLogger);
         }
     }
 
     private final String name;
+    private final ZooKeeperClient zkc;
     private final StatsLogger statsLogger;
 
     protected final ConcurrentMap<String, Set<Watcher>> childWatches;
     protected final AtomicInteger allWatchesGauge;
 
     private ZKWatcherManager(String name,
+                             ZooKeeperClient zkc,
                              StatsLogger statsLogger) {
         this.name = name;
+        this.zkc = zkc;
         this.statsLogger = statsLogger;
 
         // watches
@@ -127,7 +139,7 @@ public class ZKWatcherManager implements Watcher {
         return this;
     }
 
-    public void unregisterChildWatcher(String path, Watcher watcher) {
+    public void unregisterChildWatcher(String path, Watcher watcher, boolean removeFromServer)
{
         Set<Watcher> watchers = childWatches.get(path);
         if (null == watchers) {
             logger.warn("No watchers found on path {} while unregistering child watcher {}.",
@@ -141,6 +153,26 @@ public class ZKWatcherManager implements Watcher {
                 logger.warn("Remove a non-registered child watcher {} from path {}", watcher,
path);
             }
             if (watchers.isEmpty()) {
+                // best-efforts to remove watches
+                try {
+                    if (null != zkc && removeFromServer) {
+                        zkc.get().removeWatches(path, this, WatcherType.Children, true, new
AsyncCallback.VoidCallback() {
+                            @Override
+                            public void processResult(int rc, String path, Object ctx) {
+                                if (KeeperException.Code.OK.intValue() == rc) {
+                                    logger.debug("Successfully removed children watches from
{}", path);
+                                } else {
+                                    logger.debug("Encountered exception on removing children
watches from {}",
+                                            path, KeeperException.create(KeeperException.Code.get(rc)));
+                                }
+                            }
+                        }, null);
+                    }
+                } catch (InterruptedException e) {
+                    logger.debug("Encountered exception on removing watches from {}", path,
e);
+                } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+                    logger.debug("Encountered exception on removing watches from {}", path,
e);
+                }
                 childWatches.remove(path, watchers);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
index ee00ab9..3ad181d 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
@@ -34,6 +34,7 @@ public class TestZKWatcherManager {
     public void testRegisterUnregisterWatcher() throws Exception {
         ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder()
                 .name("test-register-unregister-watcher")
+                .zkc(null)
                 .statsLogger(NullStatsLogger.INSTANCE)
                 .build();
         String path = "/test-register-unregister-watcher";
@@ -71,7 +72,7 @@ public class TestZKWatcherManager {
         assertEquals(event2, events.get(1));
 
         // unregister watcher
-        watcherManager.unregisterChildWatcher(path, watcher);
+        watcherManager.unregisterChildWatcher(path, watcher, true);
 
         assertEquals(0, watcherManager.childWatches.size());
     }


Mime
View raw message