curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cammcken...@apache.org
Subject curator git commit: CURATOR-161 - Modified the background processing framework to allow operations to request that a live connection is not necessary to execute (this is needed to run the remove watches with 'local' set to true. Cleaned up some unit test
Date Wed, 13 May 2015 23:20:31 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-161 22d034af9 -> ba4da2c3c


CURATOR-161 - Modified the background processing framework to allow
operations to request that a live connection is not necessary to execute
(this is needed to run the remove watches with 'local' set to true.
Cleaned up some unit tests.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ba4da2c3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ba4da2c3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ba4da2c3

Branch: refs/heads/CURATOR-161
Commit: ba4da2c3c7048ea249f18e7b4c815db76f0b1ad0
Parents: 22d034a
Author: Cameron McKenzie <cameron@unico.com.au>
Authored: Thu May 14 09:19:09 2015 +1000
Committer: Cameron McKenzie <cameron@unico.com.au>
Committed: Thu May 14 09:19:09 2015 +1000

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    |  2 +-
 .../framework/imps/OperationAndData.java        | 16 ++++-
 .../imps/RemoveWatchesBuilderImpl.java          | 22 ++++--
 .../framework/imps/TestRemoveWatches.java       | 73 ++++++++------------
 4 files changed, 58 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index b4a1d93..c82f984 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -821,7 +821,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         try
         {
-            if ( client.isConnected() )
+            if ( !operationAndData.isConnectionRequired() || client.isConnected() )
             {
                 operationAndData.callPerformBackgroundOperation();
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 38f59a0..b46cddb 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -40,25 +40,37 @@ class OperationAndData<T> implements Delayed, RetrySleeper
     private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
     private final long ordinal = nextOrdinal.getAndIncrement();
     private final Object context;
+    private final boolean connectionRequired;
 
     interface ErrorCallback<T>
     {
         void retriesExhausted(OperationAndData<T> operationAndData);
     }
-
-    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback,
ErrorCallback<T> errorCallback, Object context)
+    
+    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback,
ErrorCallback<T> errorCallback, Object context, boolean connectionRequired)
     {
         this.operation = operation;
         this.data = data;
         this.callback = callback;
         this.errorCallback = errorCallback;
         this.context = context;
+        this.connectionRequired = connectionRequired;
+    }      
+
+    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback,
ErrorCallback<T> errorCallback, Object context)
+    {
+        this(operation, data, callback, errorCallback, context, true);
     }
 
     Object getContext()
     {
         return context;
     }
+    
+    boolean isConnectionRequired()
+    {
+        return connectionRequired;
+    }
 
     void callPerformBackgroundOperation() throws Exception
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index 27d05da..932706b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -166,15 +166,23 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder,
RemoveWat
     
     private void pathInBackground(final String path)
     {
-        OperationAndData.ErrorCallback<String>  errorCallback = new OperationAndData.ErrorCallback<String>()
+        OperationAndData.ErrorCallback<String>  errorCallback = null;
+        
+        //Only need an error callback if we're in guaranteed mode
+        if(guaranteed)
         {
-            @Override
-            public void retriesExhausted(OperationAndData<String> operationAndData)
+            errorCallback = new OperationAndData.ErrorCallback<String>()
             {
-                client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path,
watcher));
-            }            
-        };        
-        client.processBackgroundOperation(new OperationAndData<String>(this, path,
backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
+                @Override
+                public void retriesExhausted(OperationAndData<String> operationAndData)
+                {
+                    client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path,
watcher));
+                }            
+            };
+        }
+        
+        client.processBackgroundOperation(new OperationAndData<String>(this, path,
backgrounding.getCallback(),
+                                                                       errorCallback, backgrounding.getContext(),
!local), null);
     }
     
     private void pathInForeground(final String path) throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 518f13b..fc15f0c 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -1,8 +1,9 @@
 package org.apache.curator.framework.imps;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -24,13 +25,30 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestRemoveWatches extends BaseClassForTests
 {
+    private boolean blockUntilDesiredConnectionState(CuratorFramework client, Timing timing,
final ConnectionState desiredState)
+    {
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+        {
+            
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                if(newState == desiredState)
+                {
+                    latch.countDown();
+                }
+            }
+        });
+        
+        return timing.awaitLatch(latch);
+    }
+    
     @Test
     public void testRemoveCuratorDefaultWatcher() throws Exception
     {
@@ -330,7 +348,7 @@ public class TestRemoveWatches extends BaseClassForTests
             //Stop the server so we can check if we can remove watches locally when offline
             server.stop();
             
-            timing.sleepABit();
+            blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
                        
             client.watches().removeAll().locally().forPath(path);
             
@@ -364,7 +382,7 @@ public class TestRemoveWatches extends BaseClassForTests
             //Stop the server so we can check if we can remove watches locally when offline
             server.stop();
             
-            timing.sleepABit();
+            blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
                        
             client.watches().removeAll().locally().inBackground().forPath(path);
             
@@ -452,25 +470,7 @@ public class TestRemoveWatches extends BaseClassForTests
         try
         {
             client.start();
-            
-            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
-            final CountDownLatch suspendedLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-            {
-                @Override
-                public void stateChanged(CuratorFramework client, ConnectionState newState)
-                {
-                    if(newState == ConnectionState.SUSPENDED)
-                    {
-                        suspendedLatch.countDown();
-                    }
-                    else if(newState == ConnectionState.RECONNECTED)
-                    {
-                        reconnectedLatch.countDown();
-                    }
-                }
-            });
-            
+                       
             String path = "/";
             
             CountDownLatch removeLatch = new CountDownLatch(1);
@@ -479,7 +479,8 @@ public class TestRemoveWatches extends BaseClassForTests
             client.checkExists().usingWatcher(watcher).forPath(path);
             
             server.stop();           
-            timing.awaitLatch(suspendedLatch);
+            
+            blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
             
             //Remove the watch while we're not connected
             try 
@@ -510,25 +511,7 @@ public class TestRemoveWatches extends BaseClassForTests
         try
         {
             client.start();
-            
-            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
-            final CountDownLatch suspendedLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-            {
-                @Override
-                public void stateChanged(CuratorFramework client, ConnectionState newState)
-                {
-                    if(newState == ConnectionState.SUSPENDED)
-                    {
-                        suspendedLatch.countDown();
-                    }
-                    else if(newState == ConnectionState.RECONNECTED)
-                    {
-                        reconnectedLatch.countDown();
-                    }
-                }
-            });
-            
+                        
             final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
             
             ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener
= new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
@@ -550,7 +533,7 @@ public class TestRemoveWatches extends BaseClassForTests
             client.checkExists().usingWatcher(watcher).forPath(path);
             
             server.stop();           
-            timing.awaitLatch(suspendedLatch);
+            blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
             
             //Remove the watch while we're not connected
             client.watches().remove(watcher).guaranteed().inBackground().forPath(path);


Mime
View raw message