curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [2/2] git commit: Moved the connection blocking code into ConnectionManager. It's cleaner and doesn't require a connection state listener
Date Mon, 16 Jun 2014 19:36:35 GMT
Moved the connection blocking code into ConnectionManager. It's cleaner and doesn't require
a connection state listener


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/04cefb47
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/04cefb47
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/04cefb47

Branch: refs/heads/CURATOR-110
Commit: 04cefb47f18c9d4bd3a0eb897563dd5abb7c89c8
Parents: 1c94c7e
Author: randgalt <randgalt@apache.org>
Authored: Mon Jun 16 14:35:54 2014 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Mon Jun 16 14:35:54 2014 -0500

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    | 289 +++++++-----------
 .../framework/state/ConnectionStateManager.java |  38 ++-
 .../framework/imps/TestBlockUntilConnected.java | 304 +++++++++----------
 .../recipes/AfterConnectionEstablished.java     |  14 +-
 .../framework/recipes/leader/LeaderLatch.java   |  21 +-
 .../recipes/leader/TestLeaderLatch.java         |  46 +--
 6 files changed, 336 insertions(+), 376 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 14473d8..23a3248 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-
 import org.apache.curator.CuratorConnectionLossException;
 import org.apache.curator.CuratorZookeeperClient;
 import org.apache.curator.RetryLoop;
@@ -44,7 +44,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Arrays;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -59,40 +58,40 @@ import java.util.concurrent.atomic.AtomicReference;
 public class CuratorFrameworkImpl implements CuratorFramework
 {
 
-    private final Logger                                                log = LoggerFactory.getLogger(getClass());
-    private final CuratorZookeeperClient                                client;
-    private final ListenerContainer<CuratorListener>                    listeners;
-    private final ListenerContainer<UnhandledErrorListener>             unhandledErrorListeners;
-    private final ThreadFactory                                         threadFactory;
-    private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
-    private final NamespaceImpl                                         namespace;
-    private final ConnectionStateManager                                connectionStateManager;
-    private final AtomicReference<AuthInfo>                             authInfo =
new AtomicReference<AuthInfo>();
-    private final byte[]                                                defaultData;
-    private final FailedDeleteManager                                   failedDeleteManager;
-    private final CompressionProvider                                   compressionProvider;
-    private final ACLProvider                                           aclProvider;
-    private final NamespaceFacadeCache                                  namespaceFacadeCache;
-    private final NamespaceWatcherMap                                   namespaceWatcherMap
= new NamespaceWatcherMap(this);
-    private final Object												connectionLock = new Object();
-
-    private volatile ExecutorService                                    executorService;
-    private final AtomicBoolean                                         logAsErrorConnectionErrors
= new AtomicBoolean(false);
-
-    private static final boolean                                        LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL
= !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorZookeeperClient client;
+    private final ListenerContainer<CuratorListener> listeners;
+    private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
+    private final ThreadFactory threadFactory;
+    private final BlockingQueue<OperationAndData<?>> backgroundOperations;
+    private final NamespaceImpl namespace;
+    private final ConnectionStateManager connectionStateManager;
+    private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
+    private final byte[] defaultData;
+    private final FailedDeleteManager failedDeleteManager;
+    private final CompressionProvider compressionProvider;
+    private final ACLProvider aclProvider;
+    private final NamespaceFacadeCache namespaceFacadeCache;
+    private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
+
+    private volatile ExecutorService executorService;
+    private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
+
+    private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
 
     interface DebugBackgroundListener
     {
-        void        listen(OperationAndData<?> data);
+        void listen(OperationAndData<?> data);
     }
-    volatile DebugBackgroundListener        debugListener = null;
 
-    private final AtomicReference<CuratorFrameworkState>                    state;
+    volatile DebugBackgroundListener debugListener = null;
+
+    private final AtomicReference<CuratorFrameworkState> state;
 
     private static class AuthInfo
     {
-        final String    scheme;
-        final byte[]    auth;
+        final String scheme;
+        final byte[] auth;
 
         private AuthInfo(String scheme, byte[] auth)
         {
@@ -113,37 +112,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
     public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
     {
         ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
-        this.client = new CuratorZookeeperClient
-        (
-            localZookeeperFactory,
-            builder.getEnsembleProvider(),
-            builder.getSessionTimeoutMs(),
-            builder.getConnectionTimeoutMs(),
-            new Watcher()
+        this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
+        {
+            @Override
+            public void process(WatchedEvent watchedEvent)
             {
-                @Override
-                public void process(WatchedEvent watchedEvent)
-                {
-                    CuratorEvent event = new CuratorEventImpl
-                    (
-                        CuratorFrameworkImpl.this,
-                        CuratorEventType.WATCHED,
-                        watchedEvent.getState().getIntValue(),
-                        unfixForNamespace(watchedEvent.getPath()),
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        watchedEvent,
-                        null
-                    );
-                    processEvent(event);
-                }
-            },
-            builder.getRetryPolicy(),
-            builder.canBeReadOnly()
-        );
+                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,
watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null,
null, null, null, watchedEvent, null);
+                processEvent(event);
+            }
+        }, builder.getRetryPolicy(), builder.canBeReadOnly());
 
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -155,7 +132,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
 
-        byte[]      builderDefaultData = builder.getDefaultData();
+        byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length)
: new byte[0];
 
         if ( builder.getAuthScheme() != null )
@@ -165,23 +142,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
         failedDeleteManager = new FailedDeleteManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
-        
-        //Add callback handler to determine connection state transitions
-    	getConnectionStateListenable().addListener(new ConnectionStateListener()
-    	{
-			
-			@Override
-			public void stateChanged(CuratorFramework client, ConnectionState newState)
-			{
-				if(newState.isConnected())
-				{
-					synchronized(connectionLock)
-					{
-						connectionLock.notifyAll();
-					}
-				}
-			}
-		});
     }
 
     private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
@@ -253,7 +213,19 @@ public class CuratorFrameworkImpl implements CuratorFramework
     }
 
     @Override
-    public void     start()
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+    {
+        return connectionStateManager.blockUntilConnected(maxWaitTime, units);
+    }
+
+    @Override
+    public void blockUntilConnected() throws InterruptedException
+    {
+        blockUntilConnected(0, null);
+    }
+
+    @Override
+    public void start()
     {
         log.info("Starting");
         if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED)
)
@@ -266,7 +238,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         try
         {
             connectionStateManager.start(); // ordering dependency - must be called before
client.start()
-            
+
             final ConnectionStateListener listener = new ConnectionStateListener()
             {
                 @Override
@@ -278,16 +250,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
                     }
                 }
             };
-            
+
             this.getConnectionStateListenable().addListener(listener);
 
             client.start();
 
             executorService = Executors.newFixedThreadPool(2, threadFactory);  // 1 for listeners,
1 for background ops
 
-            executorService.submit
-            (
-                new Callable<Object>()
+            executorService.submit(new Callable<Object>()
                 {
                     @Override
                     public Object call() throws Exception
@@ -295,8 +265,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                         backgroundOperationsLoop();
                         return null;
                     }
-                }
-            );
+                });
         }
         catch ( Exception e )
         {
@@ -305,31 +274,28 @@ public class CuratorFrameworkImpl implements CuratorFramework
     }
 
     @Override
-    public void     close()
+    public void close()
     {
         log.debug("Closing");
         if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED)
)
         {
-            listeners.forEach
-                (
-                    new Function<CuratorListener, Void>()
+            listeners.forEach(new Function<CuratorListener, Void>()
+                {
+                    @Override
+                    public Void apply(CuratorListener listener)
                     {
-                        @Override
-                        public Void apply(CuratorListener listener)
+                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this,
CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
+                        try
+                        {
+                            listener.eventReceived(CuratorFrameworkImpl.this, event);
+                        }
+                        catch ( Exception e )
                         {
-                            CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this,
CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
-                            try
-                            {
-                                listener.eventReceived(CuratorFrameworkImpl.this, event);
-                            }
-                            catch ( Exception e )
-                            {
-                                log.error("Exception while sending Closing event", e);
-                            }
-                            return null;
+                            log.error("Exception while sending Closing event", e);
                         }
+                        return null;
                     }
-                );
+                });
 
             listeners.clear();
             unhandledErrorListeners.clear();
@@ -515,14 +481,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData,
CuratorEvent event)
     {
-        boolean     isInitialExecution = (event == null);
+        boolean isInitialExecution = (event == null);
         if ( isInitialExecution )
         {
             performBackgroundOperation(operationAndData);
             return;
         }
 
-        boolean     doQueueOperation = false;
+        boolean doQueueOperation = false;
         do
         {
             if ( RetryLoop.shouldRetry(event.getResultCode()) )
@@ -538,7 +504,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
 
             processEvent(event);
-        } while ( false );
+        }
+        while ( false );
 
         if ( doQueueOperation )
         {
@@ -560,7 +527,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
         if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof
KeeperException) )
         {
-            if ( e instanceof KeeperException.ConnectionLossException || e instanceof CuratorConnectionLossException
) {
+            if ( e instanceof KeeperException.ConnectionLossException )
+            {
                 if ( LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true,
false) )
                 {
                     log.error(reason, e);
@@ -576,27 +544,24 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
         }
 
-        final String        localReason = reason;
-        unhandledErrorListeners.forEach
-            (
-                new Function<UnhandledErrorListener, Void>()
+        final String localReason = reason;
+        unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
+            {
+                @Override
+                public Void apply(UnhandledErrorListener listener)
                 {
-                    @Override
-                    public Void apply(UnhandledErrorListener listener)
-                    {
-                        listener.unhandledError(localReason, e);
-                        return null;
-                    }
+                    listener.unhandledError(localReason, e);
+                    return null;
                 }
-            );
+            });
     }
 
-    String    unfixForNamespace(String path)
+    String unfixForNamespace(String path)
     {
         return namespace.unfixForNamespace(path);
     }
 
-    String    fixForNamespace(String path)
+    String fixForNamespace(String path)
     {
         return namespace.fixForNamespace(path);
     }
@@ -723,8 +688,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 sendToBackgroundCallback(operationAndData, event);
             }
 
-            KeeperException.Code    code = KeeperException.Code.get(event.getResultCode());
-            Exception               e = null;
+            KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
+            Exception e = null;
             try
             {
                 e = (code != null) ? KeeperException.create(code) : null;
@@ -755,7 +720,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
     }
 
-    private<DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE>
operationAndData, Throwable e)
+    private <DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE>
operationAndData, Throwable e)
     {
         do
         {
@@ -788,14 +753,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
 
             logError("Background exception was not retry-able or retry gave up", e);
-        } while ( false );
+        }
+        while ( false );
     }
 
     private void backgroundOperationsLoop()
     {
         while ( !Thread.interrupted() )
         {
-            OperationAndData<?>         operationAndData;
+            OperationAndData<?> operationAndData;
             try
             {
                 operationAndData = backgroundOperations.take();
@@ -850,7 +816,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 }
                 else
                 {
-                	logError("Background retry gave up", e);
+                    logError("Background retry gave up", e);
                 }
             }
             else
@@ -867,70 +833,23 @@ public class CuratorFrameworkImpl implements CuratorFramework
             validateConnection(curatorEvent.getWatchedEvent().getState());
         }
 
-        listeners.forEach
-        (
-            new Function<CuratorListener, Void>()
+        listeners.forEach(new Function<CuratorListener, Void>()
+        {
+            @Override
+            public Void apply(CuratorListener listener)
             {
-                @Override
-                public Void apply(CuratorListener listener)
+                try
                 {
-                    try
-                    {
-                        TimeTrace trace = client.startTracer("EventListener");
-                        listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
-                        trace.commit();
-                    }
-                    catch ( Exception e )
-                    {
-                        logError("Event listener threw exception", e);
-                    }
-                    return null;
+                    TimeTrace trace = client.startTracer("EventListener");
+                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
+                    trace.commit();
                 }
+                catch ( Exception e )
+                {
+                    logError("Event listener threw exception", e);
+                }
+                return null;
             }
-        );
-    }
-    
-    @Override
-    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
-    {
-    	//Check if we're already connected
-    	ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState();
-    	if(currentConnectionState != null && currentConnectionState.isConnected())
-    	{
-    		return true;
-    	}
-    	
-    	long startTime = System.currentTimeMillis();
-    	long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units);
-    	
-    	for(;;)
-    	{
-    		synchronized(connectionLock)
-    		{
-    			currentConnectionState = connectionStateManager.getCurrentConnectionState();
-    	    	if(currentConnectionState != null && currentConnectionState.isConnected())
-    			{
-    				return true;
-    			}
-    			
-    			long waitTime = 0;
-    			if(maxWaitTime > 0)
-    			{
-    				waitTime = maxWaitTimeMS  - (System.currentTimeMillis() - startTime);
-    	   			
-    				//Timeout
-        			if(waitTime <= 0)
-        			{
-        				return false;
-        			}    					
-    			}
-    		
-    			connectionLock.wait(waitTime);
-    		}
-    	}
-    }
-    
-    public void blockUntilConnected() throws InterruptedException {
-    	blockUntilConnected(0, TimeUnit.SECONDS);
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index ba29994..fb312dc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -153,6 +154,7 @@ public class ConnectionStateManager implements Closeable
 
         currentConnectionState = ConnectionState.SUSPENDED;
         postState(ConnectionState.SUSPENDED);
+
         return true;
     }
 
@@ -188,15 +190,45 @@ public class ConnectionStateManager implements Closeable
 
         return true;
     }
-    
-    public synchronized ConnectionState getCurrentConnectionState()
+
+    public synchronized boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws
InterruptedException
+    {
+        boolean hasMaxWait = (units != null);
+        long startTime = System.currentTimeMillis();
+
+        while ( !isConnected() )
+        {
+            long maxWaitTimeMS = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime,
units) : 0;
+
+            if ( hasMaxWait )
+            {
+                long waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
+                if ( waitTime <= 0 )
+                {
+                    return isConnected();
+                }
+
+                wait(waitTime);
+            }
+            else
+            {
+                wait();
+            }
+        }
+        return isConnected();
+    }
+
+    public synchronized boolean isConnected()
     {
-    	return currentConnectionState;
+        return (currentConnectionState != null) && currentConnectionState.isConnected();
     }
 
     private void postState(ConnectionState state)
     {
         log.info("State change: " + state);
+
+        notifyAll();
+
         while ( !eventQueue.offer(state) )
         {
             eventQueue.poll();

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index 8dfb7d8..f649afb 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -16,12 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.framework.imps;
 
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+package org.apache.curator.framework.imps;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -34,202 +30,206 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class TestBlockUntilConnected extends BaseClassForTests
 {
-	/**
-	 * Test the case where we're already connected
-	 */
-	@Test
-	public void testBlockUntilConnectedCurrentlyConnected()
-	{		
-		Timing timing = new Timing();
+    /**
+     * Test the case where we're already connected
+     */
+    @Test
+    public void testBlockUntilConnectedCurrentlyConnected() throws Exception
+    {
+        Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         try
         {
-        	final CountDownLatch connectedLatch = new CountDownLatch(1);
-        	client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-        	{
-				
-				@Override
-				public void stateChanged(CuratorFramework client, ConnectionState newState)
-				{
-					if(newState.isConnected())
-					{
-						connectedLatch.countDown();
-					}
-				}
-			});
-        	
-        	client.start();
-        	
-        	Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
-        	Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState.isConnected() )
+                    {
+                        connectedLatch.countDown();
+                    }
+                }
+            });
+
+            client.start();
+
+            Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
+            Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
         }
-        catch(InterruptedException e)
+        catch ( InterruptedException e )
         {
-        	Assert.fail("Unexpected interruption");
+            Assert.fail("Unexpected interruption");
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}
-	
-	/**
-	 * Test the case where we are not currently connected and never have been
-	 */
-	@Test
-	public void testBlockUntilConnectedCurrentlyNeverConnected()
-	{		
+    }
+
+    /**
+     * Test the case where we are not currently connected and never have been
+     */
+    @Test
+    public void testBlockUntilConnectedCurrentlyNeverConnected()
+    {
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         try
         {
-        	client.start();
-        	Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+            client.start();
+            Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
         }
-        catch(InterruptedException e)
+        catch ( InterruptedException e )
         {
-        	Assert.fail("Unexpected interruption");
+            Assert.fail("Unexpected interruption");
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}
-	
-	/**
-	 * Test the case where we are not currently connected, but have been previously
-	 */
-	@Test
-	public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
-	{		
-		Timing timing = new Timing();
+    }
+
+    /**
+     * Test the case where we are not currently connected, but have been previously
+     */
+    @Test
+    public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
+    {
+        Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         final CountDownLatch lostLatch = new CountDownLatch(1);
         client.getConnectionStateListenable().addListener(new ConnectionStateListener()
         {
-			
-			@Override
-			public void stateChanged(CuratorFramework client, ConnectionState newState)
-			{
-				if(newState == ConnectionState.LOST)
-				{
-					lostLatch.countDown();
-				}
-			}
-		});
-        
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            }
+        });
+
         try
         {
-        	client.start();
-        	
-        	//Block until we're connected
-        	Assert.assertTrue(client.blockUntilConnected(5,  TimeUnit.SECONDS), "Failed to connect");
-        	
-        	//Kill the server
-        	CloseableUtils.closeQuietly(server);
-        	
-        	//Wait until we hit the lost state
-        	Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
-        	
-        	server = new TestingServer(server.getPort(), server.getTempDirectory());
-        	
-        	Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+            client.start();
+
+            //Block until we're connected
+            Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to
connect");
+
+            //Kill the server
+            CloseableUtils.closeQuietly(server);
+
+            //Wait until we hit the lost state
+            Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+
+            server = new TestingServer(server.getPort(), server.getTempDirectory());
+
+            Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
         }
-        catch(Exception e)
+        catch ( Exception e )
         {
-        	Assert.fail("Unexpected exception " + e);
+            Assert.fail("Unexpected exception " + e);
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}	
-	
-	/**
-	 * Test the case where we are not currently connected and time out before a
-	 * connection becomes available.
-	 */
-	@Test
-	public void testBlockUntilConnectedConnectTimeout()
-	{	
-		//Kill the server
-		CloseableUtils.closeQuietly(server);
-		
+    }
+
+    /**
+     * Test the case where we are not currently connected and time out before a
+     * connection becomes available.
+     */
+    @Test
+    public void testBlockUntilConnectedConnectTimeout()
+    {
+        //Kill the server
+        CloseableUtils.closeQuietly(server);
+
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         try
         {
-        	client.start();
-        	Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS),
-        					   "Connected");
+            client.start();
+            Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS), "Connected");
         }
-        catch(InterruptedException e)
+        catch ( InterruptedException e )
         {
-        	Assert.fail("Unexpected interruption");
+            Assert.fail("Unexpected interruption");
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}
-	
-	/**
-	 * Test the case where we are not currently connected and the thread gets interrupted
-	 * prior to a connection becoming available
-	 */
-	@Test
-	public void testBlockUntilConnectedInterrupt()
-	{	
-		//Kill the server
-		CloseableUtils.closeQuietly(server);
-		
+    }
+
+    /**
+     * Test the case where we are not currently connected and the thread gets interrupted
+     * prior to a connection becoming available
+     */
+    @Test
+    public void testBlockUntilConnectedInterrupt()
+    {
+        //Kill the server
+        CloseableUtils.closeQuietly(server);
+
         final CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         try
         {
-        	client.start();
-        	
-        	final Thread threadToInterrupt = Thread.currentThread();
-        	
-        	Timer timer = new Timer();
-        	timer.schedule(new TimerTask() {
-				
-				@Override
-				public void run() {
-					threadToInterrupt.interrupt();
-				}
-			}, 3000);
-        	
-        	client.blockUntilConnected(5, TimeUnit.SECONDS);
-        	Assert.fail("Expected interruption did not occur");
+            client.start();
+
+            final Thread threadToInterrupt = Thread.currentThread();
+
+            Timer timer = new Timer();
+            timer.schedule(new TimerTask()
+            {
+
+                @Override
+                public void run()
+                {
+                    threadToInterrupt.interrupt();
+                }
+            }, 3000);
+
+            client.blockUntilConnected(5, TimeUnit.SECONDS);
+            Assert.fail("Expected interruption did not occur");
         }
-        catch(InterruptedException e)
+        catch ( InterruptedException e )
         {
-        	//This is expected
+            //This is expected
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}	
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index f37f7c0..41ba702 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -39,24 +38,23 @@ public class AfterConnectionEstablished
      * @param client             The curator client
      * @param runAfterConnection The logic to run
      */
-    public static <T> T execute(final CuratorFramework client, final Callable<T>
runAfterConnection) throws Exception
+    public static void execute(final CuratorFramework client, final Runnable runAfterConnection)
throws Exception
     {
         //Block until connected
-        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
-        Callable<T> internalCall = new Callable<T>()
+        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
+        Runnable internalCall = new Runnable()
         {
             @Override
-            public T call() throws Exception
+            public void run()
             {
                 try
                 {
                     client.blockUntilConnected();
-                    return runAfterConnection.call();
+                    runAfterConnection.run();
                 }
                 catch ( Exception e )
                 {
                     log.error("An error occurred blocking until a connection is available",
e);
-                    throw e;
                 }
                 finally
                 {
@@ -64,7 +62,7 @@ public class AfterConnectionEstablished
                 }
             }
         };
-        return executor.submit(internalCall).get();
+        executor.submit(internalCall);
     }
 
     private AfterConnectionEstablished()

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index f4c1cef..dce3f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -26,7 +26,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
+import org.apache.curator.framework.recipes.AfterConnectionEstablished;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -156,17 +156,22 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot
be started more than once");
 
-        ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground
+        AfterConnectionEstablished.execute
         (
-            client,
-            new Callable<Void>()
+            client, new Runnable()
             {
                 @Override
-                public Void call() throws Exception
+                public void run()
                 {
                     client.getConnectionStateListenable().addListener(listener);
-                    reset();
-                    return null;
+                    try
+                    {
+                        reset();
+                    }
+                    catch ( Exception e )
+                    {
+                        log.error("An error occurred checking resetting leadership.", e);
+                    }
                 }
             }
         );
@@ -556,7 +561,7 @@ public class LeaderLatch implements Closeable
 
     private void handleStateChange(ConnectionState newState)
     {
-        if (newState.isConnected())
+        if ( newState == ConnectionState.RECONNECTED )
         {
             try
             {

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 35d8809..b97e708 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.recipes.leader;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
@@ -35,7 +34,6 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -215,6 +213,17 @@ public class TestLeaderLatch extends BaseClassForTests
     @Test
     public void testWaiting() throws Exception
     {
+        final int LOOPS = 10;
+        for ( int i = 0; i < LOOPS; ++i )
+        {
+            System.out.println("TRY #" + i);
+            internalTestWaitingOnce();
+            Thread.sleep(10);
+        }
+    }
+
+    private void internalTestWaitingOnce() throws Exception
+    {
         final int PARTICIPANT_QTY = 10;
 
         ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
@@ -241,10 +250,10 @@ public class TestLeaderLatch extends BaseClassForTests
                             Assert.assertTrue(latch.await(timing.forWaiting().seconds(),
TimeUnit.SECONDS));
                             Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
                             Thread.sleep((int)(10 * Math.random()));
+                            thereIsALeader.set(false);
                         }
                         finally
                         {
-                            thereIsALeader.set(false);
                             latch.close();
                         }
                         return null;
@@ -259,7 +268,7 @@ public class TestLeaderLatch extends BaseClassForTests
         }
         finally
         {
-            executorService.shutdown();
+            executorService.shutdownNow();
             CloseableUtils.closeQuietly(client);
         }
     }
@@ -526,23 +535,21 @@ public class TestLeaderLatch extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
     public void testNoServerAtStart()
-        {
+    {
         CloseableUtils.closeQuietly(server);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryNTimes(5, 1000));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryNTimes(5, 1000));
 
         client.start();
 
         final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
         final CountDownLatch leaderCounter = new CountDownLatch(1);
         final AtomicInteger leaderCount = new AtomicInteger(0);
-        final AtomicInteger notLeaderCount = new AtomicInteger(0);        
+        final AtomicInteger notLeaderCount = new AtomicInteger(0);
         leader.addListener(new LeaderLatchListener()
         {
             @Override
@@ -555,27 +562,26 @@ public class TestLeaderLatch extends BaseClassForTests
             @Override
             public void notLeader()
             {
-            	notLeaderCount.incrementAndGet();
+                notLeaderCount.incrementAndGet();
             }
 
         });
 
         try
         {
-        	leader.start();
-         
-        	//Wait for a while before starting the test server
-        	Thread.sleep(5000);
+            leader.start();
+
+            timing.sleepABit();
 
             // Start the new server
-            server = new TestingServer(server.getPort());
+            server = new TestingServer(server.getPort(), server.getTempDirectory());
 
             Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
-            
+
             Assert.assertEquals(leaderCount.get(), 1, "Elected too many times");
-            Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");   
        
+            Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
         }
-        catch (Exception e)
+        catch ( Exception e )
         {
             Assert.fail("Unexpected exception", e);
         }
@@ -585,7 +591,7 @@ public class TestLeaderLatch extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
             CloseableUtils.closeQuietly(server);
         }
-    }    
+    }
 
     private enum Mode
     {


Mime
View raw message