curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [4/7] curator git commit: CURATOR-161 - Added support for guaranteed removal of watches. This includes refactoring the FailedDeleteManager code into a FailedOperationManager to allow subclassing.
Date Wed, 20 May 2015 00:53:13 GMT
CURATOR-161 - Added support for guaranteed removal of watches. This
includes refactoring the FailedDeleteManager code into a
FailedOperationManager to allow subclassing.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/22d034af
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/22d034af
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/22d034af

Branch: refs/heads/CURATOR-217
Commit: 22d034af90987940420649c5f320e8dc09910c8a
Parents: 04caf36
Author: Cameron McKenzie <cameron@unico.com.au>
Authored: Wed May 13 09:28:45 2015 +1000
Committer: Cameron McKenzie <cameron@unico.com.au>
Committed: Wed May 13 09:28:45 2015 +1000

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |   4 +
 .../curator/framework/api/DeleteBuilder.java    |   2 +-
 .../curator/framework/api/Guaranteeable.java    |  20 +--
 .../framework/api/GuaranteeableDelete.java      |  39 ++++++
 .../framework/api/RemoveWatchesType.java        |   2 +-
 .../framework/imps/CuratorFrameworkImpl.java    |   8 ++
 .../framework/imps/DeleteBuilderImpl.java       |   4 +-
 .../framework/imps/FailedDeleteManager.java     |  39 +-----
 .../framework/imps/FailedOperationManager.java  |  65 ++++++++++
 .../imps/FailedRemoveWatchManager.java          |  56 ++++++++
 .../framework/imps/NamespaceWatcherMap.java     |   8 ++
 .../imps/RemoveWatchesBuilderImpl.java          |  56 ++++++--
 .../framework/imps/TestFailedDeleteManager.java |   9 +-
 .../framework/imps/TestRemoveWatches.java       | 129 +++++++++++++++++++
 14 files changed, 377 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 4b30fd4..2bce552 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -216,7 +216,11 @@ public interface CuratorFramework extends Closeable
      * Call this method on watchers you are no longer interested in.
      *
      * @param watcher the watcher
+     * 
+     * @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references
+     * when they are no longer used.
      */
+    @Deprecated
     public void clearWatcherReferences(Watcher watcher);
         
     /**

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
index 3a3faf7..893e825 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
@@ -18,6 +18,6 @@
  */
 package org.apache.curator.framework.api;
 
-public interface DeleteBuilder extends Guaranteeable, ChildrenDeletable
+public interface DeleteBuilder extends GuaranteeableDelete, ChildrenDeletable
 {
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
index 481911b..b43d6b0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
@@ -18,23 +18,15 @@
  */
 package org.apache.curator.framework.api;
 
-public interface Guaranteeable extends BackgroundVersionable
+public interface Guaranteeable<T>
 {
     /**
-     * <p>
-     *     Solves this edge case: deleting a node can fail due to connection issues. Further,
-     *     if the node was ephemeral, the node will not get auto-deleted as the session is
still valid.
-     *     This can wreak havoc with lock implementations.
-     * </p>
-     *
-     * <p>
-     *     When <code>guaranteed</code> is set, Curator will record failed node
deletions and
-     *     attempt to delete them in the background until successful. NOTE: you will still
get an
-     *     exception when the deletion fails. But, you can be assured that as long as the
-     *     {@link org.apache.curator.framework.CuratorFramework} instance is open attempts
will be made to delete the node.
-     * </p>
+     * Solves edge cases where an operation may succeed on the server but connection failure
occurs before a
+     * response can be successfully returned to the client.
+     * 
+     * @see org.apache.curator.framework.api.GuaranteeableDelete 
      *  
      * @return this
      */
-    public ChildrenDeletable guaranteed();
+    public T guaranteed();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java
new file mode 100644
index 0000000..d04e7ea
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * <p>
+ *     Solves this edge case: deleting a node can fail due to connection issues. Further,
+ *     if the node was ephemeral, the node will not get auto-deleted as the session is still
valid.
+ *     This can wreak havoc with lock implementations.
+ * </p>
+ *
+ * <p>
+ *     When <code>guaranteed</code> is set, Curator will record failed node deletions
and
+ *     attempt to delete them in the background until successful. NOTE: you will still get
an
+ *     exception when the deletion fails. But, you can be assured that as long as the
+ *     {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will
be made to delete the node.
+ * </p>
+ *  
+ * @return this
+ */
+public interface GuaranteeableDelete extends Guaranteeable<ChildrenDeletable>, BackgroundVersionable
+{
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
index 1123afd..3112eac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
@@ -6,7 +6,7 @@ import org.apache.zookeeper.Watcher.WatcherType;
  * Builder to allow the specification of whether it is acceptable to remove client side watch
information
  * in the case where ZK cannot be contacted. 
  */
-public interface RemoveWatchesType extends RemoveWatchesLocal
+public interface RemoveWatchesType extends RemoveWatchesLocal, Guaranteeable<BackgroundPathableQuietly<Void>>
 {
    
     /**

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 5caff7d..b4a1d93 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -72,6 +72,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
     private final byte[] defaultData;
     private final FailedDeleteManager failedDeleteManager;
+    private final FailedRemoveWatchManager failedRemoveWatcherManager;
     private final CompressionProvider compressionProvider;
     private final ACLProvider aclProvider;
     private final NamespaceFacadeCache namespaceFacadeCache;
@@ -147,6 +148,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
 
         failedDeleteManager = new FailedDeleteManager(this);
+        failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
     }
 
@@ -190,6 +192,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateManager = parent.connectionStateManager;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
+        failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
         compressionProvider = parent.compressionProvider;
         aclProvider = parent.aclProvider;
         namespaceFacadeCache = parent.namespaceFacadeCache;
@@ -487,6 +490,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         return failedDeleteManager;
     }
+    
+    FailedRemoveWatchManager getFailedRemoveWatcherManager()
+    {
+        return failedRemoveWatcherManager;
+    }    
 
     RetryLoop newRetryLoop()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 5d8b846..51691dd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -203,7 +203,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
                     @Override
                     public void retriesExhausted(OperationAndData<String> operationAndData)
                     {
-                        client.getFailedDeleteManager().addFailedDelete(unfixedPath);
+                        client.getFailedDeleteManager().addFailedOperation(unfixedPath);
                     }
                 };
             }
@@ -253,7 +253,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
             //Only retry a guaranteed delete if it's a retryable error
             if( RetryLoop.isRetryException(e) && guaranteed )
             {
-                client.getFailedDeleteManager().addFailedDelete(unfixedPath);
+                client.getFailedDeleteManager().addFailedOperation(unfixedPath);
             }
             throw e;
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
index deb7f40..934ae40 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
@@ -19,45 +19,18 @@
 package org.apache.curator.framework.imps;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-class FailedDeleteManager
+class FailedDeleteManager extends FailedOperationManager<String>
 {
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    
-    volatile FailedDeleteManagerListener debugListener = null;
-    
-    interface FailedDeleteManagerListener
-    {
-       public void pathAddedForDelete(String path);
-    }
-
     FailedDeleteManager(CuratorFramework client)
     {
-        this.client = client;
+        super(client);
     }
 
-    void addFailedDelete(String path)
+    @Override
+    protected void executeGuaranteedOperationInBackground(String path)
+            throws Exception
     {
-        if ( debugListener != null )
-        {
-            debugListener.pathAddedForDelete(path);
-        }
-        
-        
-        if ( client.getState() == CuratorFrameworkState.STARTED )
-        {
-            log.debug("Path being added to guaranteed delete set: " + path);
-            try
-            {
-                client.delete().guaranteed().inBackground().forPath(path);
-            }
-            catch ( Exception e )
-            {
-                addFailedDelete(path);
-            }
-        }
+        client.delete().guaranteed().inBackground().forPath(path);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
new file mode 100644
index 0000000..a1efde2
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class FailedOperationManager<T>
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    protected final CuratorFramework client;
+    
+    volatile FailedOperationManagerListener<T> debugListener = null;
+    
+    interface FailedOperationManagerListener<T>
+    {
+       public void pathAddedForGuaranteedOperation(T detail);
+    }
+
+    FailedOperationManager(CuratorFramework client)
+    {
+        this.client = client;
+    }
+
+    void addFailedOperation(T details)
+    {
+        if ( debugListener != null )
+        {
+            debugListener.pathAddedForGuaranteedOperation(details);
+        }
+        
+        
+        if ( client.getState() == CuratorFrameworkState.STARTED )
+        {
+            log.debug("Details being added to guaranteed operation set: " + details);
+            try
+            {
+                executeGuaranteedOperationInBackground(details);
+            }
+            catch ( Exception e )
+            {
+                addFailedOperation(details);
+            }
+        }
+    }
+    
+    protected abstract void executeGuaranteedOperationInBackground(T details) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
new file mode 100644
index 0000000..f954e2a
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.Watcher;
+
+class FailedRemoveWatchManager extends FailedOperationManager<FailedRemoveWatchManager.FailedRemoveWatchDetails>
+{
+    FailedRemoveWatchManager(CuratorFramework client)
+    {
+        super(client);
+    }
+
+    @Override
+    protected void executeGuaranteedOperationInBackground(FailedRemoveWatchDetails details)
+            throws Exception
+    {
+        if(details.watcher ==  null)
+        {
+            client.watches().removeAll().guaranteed().inBackground().forPath(details.path);
+        }
+        else
+        {
+            client.watches().remove(details.watcher).guaranteed().inBackground().forPath(details.path);
+        }
+    }
+    
+    static class FailedRemoveWatchDetails
+    {
+        public final String path;
+        public final Watcher watcher;
+        
+        public FailedRemoveWatchDetails(String path, Watcher watcher)
+        {
+            this.path = path;
+            this.watcher = watcher;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
index e5aecb2..f656ba1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
@@ -70,6 +70,14 @@ class NamespaceWatcherMap implements Closeable
     {
         return map.remove(key);
     }
+    
+    /**
+     * Remove all watchers for a given path
+     * @param path
+     */
+    void removeAllForPath(String path) {
+        
+    }
 
     @VisibleForTesting
     boolean isEmpty()

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index c9868f4..27d05da 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -27,8 +27,9 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
     private CuratorFrameworkImpl client;
     private Watcher watcher;
     private WatcherType watcherType;
+    private boolean guaranteed;
     private boolean local;
-    private boolean quietly;
+    private boolean quietly;    
     private Backgrounding backgrounding;
     
     public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client)
@@ -36,6 +37,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
         this.client = client;
         this.watcher = null;
         this.watcherType = WatcherType.Any;
+        this.guaranteed = false;
         this.local = false;
         this.quietly = false;
         this.backgrounding = new Backgrounding();
@@ -44,14 +46,26 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder,
RemoveWat
     @Override
     public RemoveWatchesType remove(Watcher watcher)
     {
-        this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+        if(watcher == null) {
+            this.watcher = null;
+        } else {
+            //Try and get the namespaced version of the watcher.
+            this.watcher = client.getNamespaceWatcherMap().get(watcher);
+            
+            //If this is not present then default to the original watcher. This shouldn't
happen in practice unless the user
+            //has added a watch directly to the ZK client rather than via the CuratorFramework.
+            if(this.watcher == null) {
+                this.watcher = watcher;
+            }
+        }
+
         return this;
     }
     
     @Override
     public RemoveWatchesType remove(CuratorWatcher watcher)
     {
-        this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+        this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().get(watcher);
         return this;
     }    
 
@@ -111,6 +125,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder,
RemoveWat
         backgrounding = new Backgrounding(context);
         return this;
     }
+    
+    @Override
+    public RemoveWatchesLocal guaranteed()
+    {
+        guaranteed = true;
+        return this;
+    }    
 
     @Override
     public BackgroundPathableQuietly<Void> locally()
@@ -143,14 +164,23 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder,
RemoveWat
         return null;
     }    
     
-    private void pathInBackground(String path)
+    private void pathInBackground(final String path)
     {
-        OperationAndData.ErrorCallback<String>  errorCallback = null;        
+        OperationAndData.ErrorCallback<String>  errorCallback = new OperationAndData.ErrorCallback<String>()
+        {
+            @Override
+            public void retriesExhausted(OperationAndData<String> operationAndData)
+            {
+                client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path,
watcher));
+            }            
+        };        
         client.processBackgroundOperation(new OperationAndData<String>(this, path,
backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
     }
     
     private void pathInForeground(final String path) throws Exception
     {
+        //For the local case we don't want to use the normal retry loop and we don't want
to block until a connection is available.
+        //We just execute the removeWatch, and if it fails, ZK will just remove local watches.
         if(local)
         {
             ZooKeeper zkClient = client.getZooKeeper();
@@ -184,11 +214,21 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder,
RemoveWat
                                     zkClient.removeWatches(path, watcher, watcherType, local);
                                 }
                             }
-                            catch(KeeperException.NoWatcherException e)
+                            catch(Exception e)
                             {
-                                //Swallow this exception if the quietly flag is set, otherwise
rethrow.
-                                if(!quietly)
+                                if( RetryLoop.isRetryException(e) && guaranteed )
+                                {
+                                    //Setup the guaranteed handler
+                                    client.getFailedRemoveWatcherManager().addFailedOperation(new
FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
+                                    throw e;
+                                }
+                                else if(e instanceof KeeperException.NoWatcherException &&
quietly)
+                                {
+                                    //Ignore
+                                }
+                                else
                                 {
+                                    //Rethrow
                                     throw e;
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
index 6599745..943529f 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.imps.FailedDeleteManager.FailedDeleteManagerListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -291,11 +290,11 @@ public class TestFailedDeleteManager extends BaseClassForTests
         
         final AtomicBoolean pathAdded = new AtomicBoolean(false);
         
-        ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
+        ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>()
         {
             
             @Override
-            public void pathAddedForDelete(String path)
+            public void pathAddedForGuaranteedOperation(String path)
             {
                 pathAdded.set(true);
             }
@@ -325,11 +324,11 @@ public class TestFailedDeleteManager extends BaseClassForTests
         
         final AtomicBoolean pathAdded = new AtomicBoolean(false);
         
-        ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
+        ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>()
         {
             
             @Override
-            public void pathAddedForDelete(String path)
+            public void pathAddedForGuaranteedOperation(String path)
             {
                 pathAdded.set(true);
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 0912c70..518f13b 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -11,6 +11,10 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.CuratorListener;
 import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.FailedRemoveWatchManager.FailedRemoveWatchDetails;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
@@ -438,6 +442,131 @@ public class TestRemoveWatches extends BaseClassForTests
         }
     }
     
+    @Test
+    public void testGuaranteedRemoveWatch() throws Exception {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        try
+        {
+            client.start();
+            
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            final CountDownLatch suspendedLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if(newState == ConnectionState.SUSPENDED)
+                    {
+                        suspendedLatch.countDown();
+                    }
+                    else if(newState == ConnectionState.RECONNECTED)
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                }
+            });
+            
+            String path = "/";
+            
+            CountDownLatch removeLatch = new CountDownLatch(1);
+            
+            Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
           
+            client.checkExists().usingWatcher(watcher).forPath(path);
+            
+            server.stop();           
+            timing.awaitLatch(suspendedLatch);
+            
+            //Remove the watch while we're not connected
+            try 
+            {
+                client.watches().remove(watcher).guaranteed().forPath(path);
+                Assert.fail();
+            }
+            catch(KeeperException.ConnectionLossException e)
+            {
+                //Expected
+            }
+            
+            server.restart();
+            
+            timing.awaitLatch(removeLatch);            
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+    
+    @Test
+    public void testGuaranteedRemoveWatchInBackground() throws Exception {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(),
+                                                                    new ExponentialBackoffRetry(100,
3));
+        try
+        {
+            client.start();
+            
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            final CountDownLatch suspendedLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if(newState == ConnectionState.SUSPENDED)
+                    {
+                        suspendedLatch.countDown();
+                    }
+                    else if(newState == ConnectionState.RECONNECTED)
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                }
+            });
+            
+            final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
+            
+            ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener
= new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
+            {
+
+                @Override
+                public void pathAddedForGuaranteedOperation(
+                        FailedRemoveWatchDetails detail)
+                {
+                    guaranteeAddedLatch.countDown();
+                }
+            };
+            
+            String path = "/";
+            
+            CountDownLatch removeLatch = new CountDownLatch(1);
+            
+            Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
           
+            client.checkExists().usingWatcher(watcher).forPath(path);
+            
+            server.stop();           
+            timing.awaitLatch(suspendedLatch);
+            
+            //Remove the watch while we're not connected
+            client.watches().remove(watcher).guaranteed().inBackground().forPath(path);
+            
+            timing.awaitLatch(guaranteeAddedLatch);
+            
+            server.restart();
+            
+            timing.awaitLatch(removeLatch);            
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }    
+    
     private static class CountDownWatcher implements Watcher {
         private String path;
         private EventType eventType;


Mime
View raw message