curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cammcken...@apache.org
Subject [1/5] curator git commit: Reworked WatcherRemovalManager. It now stores watchers only on successful operations. This is more like how ZK does it. Also, exists watcher must be stored when there is a NoNode result.
Date Wed, 01 Jun 2016 00:06:39 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-3.0 eefdf8ee9 -> e2200daad


Reworked WatcherRemovalManager. It now stores watchers only on successful operations. This
is more like how ZK does it.
Also, exists watcher must be stored when there is a NoNode result.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f59f23c7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f59f23c7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f59f23c7

Branch: refs/heads/CURATOR-3.0
Commit: f59f23c703815317d4ef1d39e2b815e402d1559b
Parents: eefdf8e
Author: randgalt <randgalt@apache.org>
Authored: Thu May 26 16:59:08 2016 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Thu May 26 16:59:08 2016 -0500

----------------------------------------------------------------------
 curator-framework/pom.xml                       |  14 ++
 .../curator/framework/imps/Backgrounding.java   |   5 -
 .../framework/imps/CuratorFrameworkImpl.java    |   2 -
 .../curator/framework/imps/EnsembleTracker.java |  22 +-
 .../framework/imps/ExistsBuilderImpl.java       |   7 +-
 .../framework/imps/GetChildrenBuilderImpl.java  |   8 +-
 .../framework/imps/GetConfigBuilderImpl.java    |  11 +-
 .../framework/imps/GetDataBuilderImpl.java      |   7 +-
 .../framework/imps/OperationAndData.java        |  14 +-
 .../imps/RemoveWatchesBuilderImpl.java          |   2 +-
 .../apache/curator/framework/imps/Watching.java |  41 +---
 .../curator/framework/imps/TestCleanState.java  | 103 +++++++++
 .../imps/TestWatcherRemovalManager.java         | 208 +++++++++++++++----
 curator-recipes/pom.xml                         |   7 +
 .../curator/framework/imps/TestCleanState.java  |  77 -------
 pom.xml                                         |   7 +
 16 files changed, 357 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/pom.xml
----------------------------------------------------------------------
diff --git a/curator-framework/pom.xml b/curator-framework/pom.xml
index d6575cc..1a65898 100644
--- a/curator-framework/pom.xml
+++ b/curator-framework/pom.xml
@@ -88,4 +88,18 @@
         </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
index 0b823c4..4ac2edc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
@@ -116,11 +116,6 @@ class Backgrounding
     {
         if ( e != null )
         {
-            if ( watching != null )
-            {
-                watching.resetCurrentWatcher();
-            }
-
             if ( errorListener != null )
             {
                 errorListener.unhandledError("n/a", e);

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 51485f2..aba14c6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -910,7 +910,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     void performBackgroundOperation(OperationAndData<?> operationAndData)
     {
-        operationAndData.resetCurrentWatcher();
         try
         {
             if ( !operationAndData.isConnectionRequired() || client.isConnected() )
@@ -930,7 +929,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
         catch ( Throwable e )
         {
-            operationAndData.resetCurrentWatcher();
             ThreadUtils.checkInterrupted(e);
 
             /**

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index bc59512..0b93cab 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -43,6 +43,7 @@ import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.util.Arrays;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 @VisibleForTesting
@@ -52,6 +53,7 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
     private final WatcherRemoveCuratorFramework client;
     private final EnsembleProvider ensembleProvider;
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final AtomicInteger outstanding = new AtomicInteger(0);
     private final AtomicReference<QuorumMaj> currentConfig = new AtomicReference<>(new
QuorumMaj(Maps.<Long, QuorumPeer.QuorumServer>newHashMap()));
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
@@ -121,22 +123,38 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
         return currentConfig.get();
     }
 
+    @VisibleForTesting
+    public boolean hasOutstanding()
+    {
+        return outstanding.get() > 0;
+    }
+
     private void reset() throws Exception
     {
-        if ( client.getState() == CuratorFrameworkState.STARTED )
+        if ( (client.getState() == CuratorFrameworkState.STARTED) && (state.get()
== State.STARTED) )
         {
             BackgroundCallback backgroundCallback = new BackgroundCallback()
             {
                 @Override
                 public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
                 {
+                    outstanding.decrementAndGet();
                     if ( (event.getType() == CuratorEventType.GET_CONFIG) && (event.getResultCode()
== KeeperException.Code.OK.intValue()) )
                     {
                         processConfigData(event.getData());
                     }
                 }
             };
-            client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble();
+            outstanding.incrementAndGet();
+            try
+            {
+                client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble();
+            }
+            catch ( Exception e )
+            {
+                outstanding.decrementAndGet();
+                throw e;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index 964706f..960b577 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -132,7 +132,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>,
E
                 @Override
                 public void processResult(int rc, String path, Object ctx, Stat stat)
                 {
-                    watching.checkBackroundRc(rc);
+                    watching.commitWatcher(rc, true);
                     trace.commit();
                     CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS,
rc, path, null, ctx, stat, null, null, null, null, null);
                     client.processBackgroundOperation(operationAndData, event);
@@ -222,8 +222,9 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>,
E
     private Stat pathInForegroundStandard(final String path) throws Exception
     {
         TimeTrace   trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Foreground");
-        Stat        returnStat = watching.callWithRetry
+        Stat        returnStat = RetryLoop.callWithRetry
         (
+            client.getZookeeperClient(),
             new Callable<Stat>()
             {
                 @Override
@@ -237,6 +238,8 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>,
E
                     else
                     {
                         returnStat = client.getZooKeeper().exists(path, watching.getWatcher(path));
+                        int rc = (returnStat != null) ? KeeperException.NoNodeException.Code.OK.intValue()
: KeeperException.NoNodeException.Code.NONODE.intValue();
+                        watching.commitWatcher(rc, true);
                     }
                     return returnStat;
                 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
index 0b1bb07..000c911 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
+import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.BackgroundPathable;
@@ -30,6 +31,7 @@ import org.apache.curator.framework.api.Pathable;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.api.WatchPathable;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
@@ -167,7 +169,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<
                 @Override
                 public void processResult(int rc, String path, Object o, List<String>
strings, Stat stat)
                 {
-                    watching.checkBackroundRc(rc);
+                    watching.commitWatcher(rc, false);
                     trace.commit();
                     if ( strings == null )
                     {
@@ -214,8 +216,9 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<
     private List<String> pathInForeground(final String path) throws Exception
     {
         TimeTrace       trace = client.getZookeeperClient().startTracer("GetChildrenBuilderImpl-Foreground");
-        List<String>    children = watching.callWithRetry
+        List<String>    children = RetryLoop.callWithRetry
         (
+            client.getZookeeperClient(),
             new Callable<List<String>>()
             {
                 @Override
@@ -229,6 +232,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<
                     else
                     {
                         children = client.getZooKeeper().getChildren(path, watching.getWatcher(path),
responseStat);
+                        watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(),
false);
                     }
                     return children;
                 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
index 3a210b8..1ab9043 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
@@ -19,9 +19,11 @@
 
 package org.apache.curator.framework.imps;
 
+import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.*;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
@@ -206,7 +208,7 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati
                 @Override
                 public void processResult(int rc, String path, Object ctx, byte[] data, Stat
stat)
                 {
-                    watching.checkBackroundRc(rc);
+                    watching.commitWatcher(rc, false);
                     trace.commit();
                     CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG,
rc, path, null, ctx, stat, data, null, null, null, null);
                     client.processBackgroundOperation(operationAndData, event);
@@ -232,8 +234,9 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati
         TimeTrace trace = client.getZookeeperClient().startTracer("GetConfigBuilderImpl-Foreground");
         try
         {
-            return watching.callWithRetry
+            return RetryLoop.callWithRetry
             (
+                client.getZookeeperClient(),
                 new Callable<byte[]>()
                 {
                     @Override
@@ -243,7 +246,9 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati
                         {
                             return client.getZooKeeper().getConfig(true, stat);
                         }
-                        return client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE),
stat);
+                        byte[] config = client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE),
stat);
+                        watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(),
false);
+                        return config;
                     }
                 }
             );

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index 5528138..bae126c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.imps;
 
+import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.*;
 import org.apache.curator.utils.ThreadUtils;
@@ -238,7 +239,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>,
                 @Override
                 public void processResult(int rc, String path, Object ctx, byte[] data, Stat
stat)
                 {
-                    watching.checkBackroundRc(rc);
+                    watching.commitWatcher(rc, false);
                     trace.commit();
                     if ( decompress && (data != null) )
                     {
@@ -294,8 +295,9 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>,
     private byte[] pathInForeground(final String path) throws Exception
     {
         TimeTrace   trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Foreground");
-        byte[]      responseData = watching.callWithRetry
+        byte[]      responseData = RetryLoop.callWithRetry
         (
+            client.getZookeeperClient(),
             new Callable<byte[]>()
             {
                 @Override
@@ -309,6 +311,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>,
                     else
                     {
                         responseData = client.getZooKeeper().getData(path, watching.getWatcher(path),
responseStat);
+                        watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(),
false);
                     }
                     return responseData;
                 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 73ea38e..3d69e5d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -41,14 +41,13 @@ class OperationAndData<T> implements Delayed, RetrySleeper
     private final AtomicLong ordinal = new AtomicLong();
     private final Object context;
     private final boolean connectionRequired;
-    private final Watching watching;
 
     interface ErrorCallback<T>
     {
         void retriesExhausted(OperationAndData<T> operationAndData);
     }
     
-    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback,
ErrorCallback<T> errorCallback, Object context, boolean connectionRequired, Watching
watching)
+    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback,
ErrorCallback<T> errorCallback, Object context, boolean connectionRequired)
     {
         this.operation = operation;
         this.data = data;
@@ -56,7 +55,6 @@ class OperationAndData<T> implements Delayed, RetrySleeper
         this.errorCallback = errorCallback;
         this.context = context;
         this.connectionRequired = connectionRequired;
-        this.watching = watching;
         reset();
     }
 
@@ -68,7 +66,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper
 
     OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback,
ErrorCallback<T> errorCallback, Object context, Watching watching)
     {
-        this(operation, data, callback, errorCallback, context, true, watching);
+        this(operation, data, callback, errorCallback, context, true);
     }
 
     Object getContext()
@@ -117,14 +115,6 @@ class OperationAndData<T> implements Delayed, RetrySleeper
         return operation;
     }
 
-    void resetCurrentWatcher()
-    {
-        if ( watching != null )
-        {
-            watching.resetCurrentWatcher();
-        }
-    }
-
     @Override
     public void sleepFor(long time, TimeUnit unit) throws InterruptedException
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index c2d4d8e..27a3c0f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -209,7 +209,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder,
RemoveWat
         }
         
         client.processBackgroundOperation(new OperationAndData<String>(this, path,
backgrounding.getCallback(),
-                                                                       errorCallback, backgrounding.getContext(),
!local, null), null);
+                                                                       errorCallback, backgrounding.getContext(),
!local), null);
     }
     
     private void pathInForeground(final String path) throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index 568f308..daa5dd3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -19,11 +19,9 @@
 
 package org.apache.curator.framework.imps;
 
-import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
-import java.util.concurrent.Callable;
 
 class Watching
 {
@@ -77,14 +75,6 @@ class Watching
             namespaceWatcher = new NamespaceWatcher(client, curatorWatcher, unfixedPath);
         }
 
-        if ( namespaceWatcher != null )
-        {
-            if ( client.getWatcherRemovalManager() != null )
-            {
-                client.getWatcherRemovalManager().add(namespaceWatcher);
-            }
-        }
-
         return namespaceWatcher;
     }
 
@@ -98,33 +88,24 @@ class Watching
         return watched;
     }
 
-    <T> T callWithRetry(Callable<T> proc) throws Exception
+    void commitWatcher(int rc, boolean isExists)
     {
-        resetCurrentWatcher();
-        try
-        {
-            return RetryLoop.callWithRetry(client.getZookeeperClient(), proc);
-        }
-        catch ( Exception e )
+        boolean doCommit = false;
+        if ( isExists )
         {
-            resetCurrentWatcher();
-            throw e;
+            doCommit = ((rc == KeeperException.Code.OK.intValue()) || (rc == KeeperException.Code.NONODE.intValue()));
         }
-    }
-
-    void resetCurrentWatcher()
-    {
-        if ( (namespaceWatcher != null) && (client.getWatcherRemovalManager() !=
null) )
+        else
         {
-            client.getWatcherRemovalManager().noteTriggeredWatcher(namespaceWatcher);
+            doCommit = (rc == KeeperException.Code.OK.intValue());
         }
-    }
 
-    void checkBackroundRc(int rc)
-    {
-        if ( rc != KeeperException.Code.OK.intValue() )
+        if ( doCommit && (namespaceWatcher != null) )
         {
-            resetCurrentWatcher();
+            if ( client.getWatcherRemovalManager() != null )
+            {
+                client.getWatcherRemovalManager().add(namespaceWatcher);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
new file mode 100644
index 0000000..aa759ee
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.WatchersDebug;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.ZooKeeper;
+import java.util.concurrent.Callable;
+
+public class TestCleanState
+{
+    public static void closeAndTestClean(CuratorFramework client)
+    {
+        if ( client == null )
+        {
+            return;
+        }
+
+        try
+        {
+            CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client;
+            EnsembleTracker ensembleTracker = internalClient.getEnsembleTracker();
+            if ( ensembleTracker != null )
+            {
+                while ( ensembleTracker.hasOutstanding() )
+                {
+                    Thread.sleep(100);
+                }
+                ensembleTracker.close();
+            }
+            ZooKeeper zooKeeper = internalClient.getZooKeeper();
+            if ( zooKeeper != null )
+            {
+                if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 )
+                {
+                    throw new AssertionError("One or more child watchers are still registered:
" + WatchersDebug.getChildWatches(zooKeeper));
+                }
+                if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 )
+                {
+                    throw new AssertionError("One or more exists watchers are still registered:
" + WatchersDebug.getExistWatches(zooKeeper));
+                }
+                if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 )
+                {
+                    throw new AssertionError("One or more data watchers are still registered:
" + WatchersDebug.getDataWatches(zooKeeper));
+                }
+            }
+        }
+        catch ( IllegalStateException ignore )
+        {
+            // client already closed
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();    // not sure what to do here
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    public static void test(CuratorFramework client, Callable<Void> proc) throws Exception
+    {
+        boolean succeeded = false;
+        try
+        {
+            proc.call();
+            succeeded = true;
+        }
+        finally
+        {
+            if ( succeeded )
+            {
+                closeAndTestClean(client);
+            }
+            else
+            {
+                CloseableUtils.closeQuietly(client);
+            }
+        }
+    }
+
+    private TestCleanState()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
index cdb625d..9c405a2 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -27,18 +27,141 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 
 public class TestWatcherRemovalManager extends BaseClassForTests
 {
     @Test
+    public void testSameWatcherDifferentPaths1Triggered() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        try
+        {
+            client.start();
+            WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+            final CountDownLatch latch = new CountDownLatch(1);
+            Watcher watcher = new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent event)
+                {
+                    latch.countDown();
+                }
+            };
+            removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+            removerClient.checkExists().usingWatcher(watcher).forPath("/d/e/f");
+            removerClient.create().creatingParentsIfNeeded().forPath("/d/e/f");
+
+            Timing timing = new Timing();
+            Assert.assertTrue(timing.awaitLatch(latch));
+            timing.sleepABit();
+
+            removerClient.removeWatchers();
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testSameWatcherDifferentPaths() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        try
+        {
+            client.start();
+            WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+            Watcher watcher = new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent event)
+                {
+                    // NOP
+                }
+            };
+            removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+            removerClient.checkExists().usingWatcher(watcher).forPath("/d/e/f");
+            Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(),
2);
+            removerClient.removeWatchers();
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testSameWatcherDifferentKinds1Triggered() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        try
+        {
+            client.start();
+            WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+            final CountDownLatch latch = new CountDownLatch(1);
+            Watcher watcher = new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent event)
+                {
+                    latch.countDown();
+                }
+            };
+
+            removerClient.create().creatingParentsIfNeeded().forPath("/a/b/c");
+            removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+            removerClient.getData().usingWatcher(watcher).forPath("/a/b/c");
+            removerClient.setData().forPath("/a/b/c", "new".getBytes());
+
+            Timing timing = new Timing();
+            Assert.assertTrue(timing.awaitLatch(latch));
+            timing.sleepABit();
+
+            removerClient.removeWatchers();
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testSameWatcherDifferentKinds() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        try
+        {
+            client.start();
+            WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+            Watcher watcher = new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent event)
+                {
+                    // NOP
+                }
+            };
+
+            removerClient.create().creatingParentsIfNeeded().forPath("/a/b/c");
+            removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+            removerClient.getData().usingWatcher(watcher).forPath("/a/b/c");
+            removerClient.removeWatchers();
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
     public void testWithRetry() throws Exception
     {
         server.stop();
@@ -68,7 +191,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -105,7 +228,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -134,47 +257,50 @@ public class TestWatcherRemovalManager extends BaseClassForTests
             {
                 // expected
             }
-            Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(),
0);
+            removerClient.removeWatchers();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
     public void testMissingNodeInBackground() throws Exception
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
-        try
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        Callable<Void> proc = new Callable<Void>()
         {
-            client.start();
-            WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
-            Watcher w = new Watcher()
+            @Override
+            public Void call() throws Exception
             {
-                @Override
-                public void process(WatchedEvent event)
+                client.start();
+                WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+                Watcher w = new Watcher()
                 {
-                    // NOP
-                }
-            };
-            final CountDownLatch latch = new CountDownLatch(1);
-            BackgroundCallback callback = new BackgroundCallback()
-            {
-                @Override
-                public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
+                    @Override
+                    public void process(WatchedEvent event)
+                    {
+                        // NOP
+                    }
+                };
+                final CountDownLatch latch = new CountDownLatch(1);
+                BackgroundCallback callback = new BackgroundCallback()
                 {
-                    latch.countDown();
-                }
-            };
-            removerClient.getData().usingWatcher(w).inBackground(callback).forPath("/one/two/three");
-            Assert.assertTrue(new Timing().awaitLatch(latch));
-            Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(),
0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
-        }
+                    @Override
+                    public void processResult(CuratorFramework client, CuratorEvent event)
throws Exception
+                    {
+                        latch.countDown();
+                    }
+                };
+                removerClient.getData().usingWatcher(w).inBackground(callback).forPath("/one/two/three");
+                Assert.assertTrue(new Timing().awaitLatch(latch));
+                Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(),
0);
+                removerClient.removeWatchers();
+                return null;
+            }
+        };
+        TestCleanState.test(client, proc);
     }
 
     @Test
@@ -188,7 +314,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -203,7 +329,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -222,7 +348,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -241,7 +367,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -252,6 +378,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         try
         {
             client.start();
+            client.create().forPath("/test");
 
             WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
 
@@ -264,14 +391,15 @@ public class TestWatcherRemovalManager extends BaseClassForTests
                 }
             };
 
-            removerClient.getData().usingWatcher(watcher).forPath("/");
+            removerClient.getData().usingWatcher(watcher).forPath("/test");
             Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
-            removerClient.getData().usingWatcher(watcher).forPath("/");
+            removerClient.getData().usingWatcher(watcher).forPath("/test");
             Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+            removerClient.removeWatchers();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -308,7 +436,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -364,7 +492,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index 17414c2..0443adc 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -52,6 +52,13 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
deleted file mode 100644
index f90f463..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.ZooKeeper;
-
-public class TestCleanState
-{
-    public static void closeAndTestClean(CuratorFramework client)
-    {
-        if ( client == null )
-        {
-            return;
-        }
-
-        try
-        {
-            CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client;
-            EnsembleTracker ensembleTracker = internalClient.getEnsembleTracker();
-            if ( ensembleTracker != null )
-            {
-                ensembleTracker.close();
-            }
-            ZooKeeper zooKeeper = internalClient.getZooKeeper();
-            if ( zooKeeper != null )
-            {
-                if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 )
-                {
-                    throw new AssertionError("One or more child watchers are still registered:
" + WatchersDebug.getChildWatches(zooKeeper));
-                }
-                if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 )
-                {
-                    throw new AssertionError("One or more exists watchers are still registered:
" + WatchersDebug.getExistWatches(zooKeeper));
-                }
-                if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 )
-                {
-                    throw new AssertionError("One or more data watchers are still registered:
" + WatchersDebug.getDataWatches(zooKeeper));
-                }
-            }
-        }
-        catch ( IllegalStateException ignore )
-        {
-            // client already closed
-        }
-        catch ( Exception e )
-        {
-            e.printStackTrace();    // not sure what to do here
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private TestCleanState()
-    {
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c89092d..384b310 100644
--- a/pom.xml
+++ b/pom.xml
@@ -322,6 +322,13 @@
 
             <dependency>
                 <groupId>org.apache.curator</groupId>
+                <artifactId>curator-framework</artifactId>
+                <type>test-jar</type>
+                <version>${project.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.curator</groupId>
                 <artifactId>curator-recipes</artifactId>
                 <version>${project.version}</version>
             </dependency>


Mime
View raw message