curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [01/18] git commit: CURATOR-110 - Modified state handling to treat 'CONNECTED' and 'RECONNECTED' events in the same way. Added a test case for the leader latch being started before a connection to ZK has been established.
Date Tue, 17 Jun 2014 23:03:05 GMT
Repository: curator
Updated Branches:
  refs/heads/master 6e98562d8 -> 5d7d0c7f1


CURATOR-110 - Modified state handling to treat 'CONNECTED' and
'RECONNECTED' events in the same way. Added a test case for the leader
latch being started before a connection to ZK has been established.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a63a102
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a63a102
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a63a102

Branch: refs/heads/master
Commit: 1a63a102ebbaaead265babd71a3d52928f848bd0
Parents: de1d38c
Author: Cameron McKenzie <cameron@unico.com.au>
Authored: Mon Jun 2 16:30:26 2014 +1000
Committer: Cameron McKenzie <cameron@unico.com.au>
Committed: Mon Jun 2 16:30:26 2014 +1000

----------------------------------------------------------------------
 .../framework/state/ConnectionState.java        |  41 +-
 .../framework/recipes/leader/LeaderLatch.java   | 633 +++++++++----------
 .../recipes/leader/TestLeaderLatch.java         | 516 +++++++--------
 3 files changed, 601 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 566d355..b25ce38 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -23,18 +23,17 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 /**
  * Represents state changes in the connection to ZK
  */
-public enum ConnectionState
-{
+public enum ConnectionState {
     /**
-     * Sent for the first successful connection to the server. NOTE: You will only
-     * get one of these messages for any CuratorFramework instance.
+     * Sent for the first successful connection to the server. NOTE: You will
+     * only get one of these messages for any CuratorFramework instance.
      */
     CONNECTED,
 
     /**
      * There has been a loss of connection. Leaders, locks, etc. should suspend
-     * until the connection is re-established. If the connection times-out you will
-     * receive a {@link #LOST} notice
+     * until the connection is re-established. If the connection times-out you
+     * will receive a {@link #LOST} notice
      */
     SUSPENDED,
 
@@ -44,18 +43,30 @@ public enum ConnectionState
     RECONNECTED,
 
     /**
-     * The connection is confirmed to be lost. Close any locks, leaders, etc. and
-     * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
-     * state after this but you should still consider any locks, etc. as dirty/unstable
+     * The connection is confirmed to be lost. Close any locks, leaders, etc.
+     * and attempt to re-create them. NOTE: it is possible to get a
+     * {@link #RECONNECTED} state after this but you should still consider any
+     * locks, etc. as dirty/unstable
      */
     LOST,
 
     /**
-     * The connection has gone into read-only mode. This can only happen if you pass true
-     * for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}. See the ZooKeeper doc
-     * regarding read only connections:
-     * <a href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode">http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>.
-     * The connection will remain in read only mode until another state change is sent.
+     * The connection has gone into read-only mode. This can only happen if you
+     * pass true for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}.
+     * See the ZooKeeper doc regarding read only connections: <a
+     * href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode"
+     * >http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>. The
+     * connection will remain in read only mode until another state change is
+     * sent.
      */
-    READ_ONLY
+    READ_ONLY;
+
+    /**
+     * Check if this state indicates that Curator has a connection to ZooKeeper
+     * 
+     * @return True if connected, false otherwise
+     */
+    public boolean isConnected() {
+        return this == CONNECTED || this == RECONNECTED || this == READ_ONLY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 88456af..d09ed1b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -19,9 +19,17 @@
 
 package org.apache.curator.framework.recipes.leader;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -38,71 +46,60 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 
 /**
  * <p>
- * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
- * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
- * randomly be assigned leader until it releases leadership at which time another one from the
- * group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of
+ * JMVs connected to a Zookeeper cluster. If a group of N thread/processes
+ * contend for leadership one will randomly be assigned leader until it releases
+ * leadership at which time another one from the group will randomly be chosen
  * </p>
  */
-public class LeaderLatch implements Closeable
-{
+public class LeaderLatch implements Closeable {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final String latchPath;
     private final String id;
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final AtomicReference<State> state = new AtomicReference<State>(
+            State.LATENT);
     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
 
-    private final ConnectionStateListener listener = new ConnectionStateListener()
-    {
+    private final ConnectionStateListener listener = new ConnectionStateListener() {
         @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
+        public void stateChanged(CuratorFramework client,
+                ConnectionState newState) {
             handleStateChange(newState);
         }
     };
 
     private static final String LOCK_NAME = "latch-";
 
-    private static final LockInternalsSorter sorter = new LockInternalsSorter()
-    {
+    private static final LockInternalsSorter sorter = new LockInternalsSorter() {
         @Override
-        public String fixForSorting(String str, String lockName)
-        {
-            return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
+        public String fixForSorting(String str, String lockName) {
+            return StandardLockInternalsDriver.standardFixForSorting(str,
+                    lockName);
         }
     };
 
-    public enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
+    public enum State {
+        LATENT, STARTED, CLOSED
     }
 
     /**
      * How to handle listeners when the latch is closed
      */
-    public enum CloseMode
-    {
+    public enum CloseMode {
         /**
-         * When the latch is closed, listeners will *not* be notified (default behavior)
+         * When the latch is closed, listeners will *not* be notified (default
+         * behavior)
          */
         SILENT,
 
@@ -113,105 +110,116 @@ public class LeaderLatch implements Closeable
     }
 
     /**
-     * @param client    the client
-     * @param latchPath the path for this leadership group
+     * @param client
+     *            the client
+     * @param latchPath
+     *            the path for this leadership group
      */
-    public LeaderLatch(CuratorFramework client, String latchPath)
-    {
+    public LeaderLatch(CuratorFramework client, String latchPath) {
         this(client, latchPath, "", CloseMode.SILENT);
     }
 
     /**
-     * @param client    the client
-     * @param latchPath the path for this leadership group
-     * @param id        participant ID
+     * @param client
+     *            the client
+     * @param latchPath
+     *            the path for this leadership group
+     * @param id
+     *            participant ID
      */
-    public LeaderLatch(CuratorFramework client, String latchPath, String id)
-    {
+    public LeaderLatch(CuratorFramework client, String latchPath, String id) {
         this(client, latchPath, id, CloseMode.SILENT);
     }
 
     /**
-     * @param client    the client
-     * @param latchPath the path for this leadership group
-     * @param id        participant ID
-     * @param closeMode behaviour of listener on explicit close.
+     * @param client
+     *            the client
+     * @param latchPath
+     *            the path for this leadership group
+     * @param id
+     *            participant ID
+     * @param closeMode
+     *            behaviour of listener on explicit close.
      */
-    public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
-    {
-        this.client = Preconditions.checkNotNull(client, "client cannot be null");
-        this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
+    public LeaderLatch(CuratorFramework client, String latchPath, String id,
+            CloseMode closeMode) {
+        this.client = Preconditions.checkNotNull(client,
+                "client cannot be null");
+        this.latchPath = Preconditions.checkNotNull(latchPath,
+                "mutexPath cannot be null");
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
-        this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
+        this.closeMode = Preconditions.checkNotNull(closeMode,
+                "closeMode cannot be null");
     }
 
     /**
-     * Add this instance to the leadership election and attempt to acquire leadership.
-     *
-     * @throws Exception errors
+     * Add this instance to the leadership election and attempt to acquire
+     * leadership.
+     * 
+     * @throws Exception
+     *             errors
      */
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+    public void start() throws Exception {
+        Preconditions.checkState(
+                state.compareAndSet(State.LATENT, State.STARTED),
+                "Cannot be started more than once");
 
         client.getConnectionStateListenable().addListener(listener);
         reset();
     }
 
     /**
-     * Remove this instance from the leadership election. If this instance is the leader, leadership
-     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
-     * instances must eventually be closed.
-     *
-     * @throws IOException errors
+     * Remove this instance from the leadership election. If this instance is
+     * the leader, leadership is released. IMPORTANT: the only way to release
+     * leadership is by calling close(). All LeaderLatch instances must
+     * eventually be closed.
+     * 
+     * @throws IOException
+     *             errors
      */
     @Override
-    public void close() throws IOException
-    {
+    public void close() throws IOException {
         close(closeMode);
     }
 
     /**
-     * Remove this instance from the leadership election. If this instance is the leader, leadership
-     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
-     * instances must eventually be closed.
-     *
-     * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
-     *
-     * @throws IOException errors
+     * Remove this instance from the leadership election. If this instance is
+     * the leader, leadership is released. IMPORTANT: the only way to release
+     * leadership is by calling close(). All LeaderLatch instances must
+     * eventually be closed.
+     * 
+     * @param closeMode
+     *            allows the default close mode to be overridden at the time the
+     *            latch is closed.
+     * 
+     * @throws IOException
+     *             errors
      */
-    public void close(CloseMode closeMode) throws IOException
-    {
-        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
+    public void close(CloseMode closeMode) throws IOException {
+        Preconditions.checkState(
+                state.compareAndSet(State.STARTED, State.CLOSED),
+                "Already closed or has not been started");
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
-        try
-        {
+        try {
             setNode(null);
-        }
-        catch ( Exception e )
-        {
+        } catch (Exception e) {
             throw new IOException(e);
-        }
-        finally
-        {
+        } finally {
             client.getConnectionStateListenable().removeListener(listener);
 
-            switch ( closeMode )
-            {
-                case NOTIFY_LEADER:
-                {
-                    setLeadership(false);
-                    listeners.clear();
-                    break;
-                }
+            switch (closeMode) {
+            case NOTIFY_LEADER: {
+                setLeadership(false);
+                listeners.clear();
+                break;
+            }
 
-                default:
-                {
-                    listeners.clear();
-                    setLeadership(false);
-                    break;
-                }
+            default: {
+                listeners.clear();
+                setLeadership(false);
+                break;
+            }
             }
         }
     }
@@ -219,134 +227,160 @@ public class LeaderLatch implements Closeable
     /**
      * Attaches a listener to this LeaderLatch
      * <p>
-     * Attaching the same listener multiple times is a noop from the second time on.
+     * Attaching the same listener multiple times is a noop from the second time
+     * on.
      * <p>
-     * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
-     * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+     * All methods for the listener are run using the provided Executor. It is
+     * common to pass in a single-threaded executor so that you can be certain
+     * that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
-     *
-     * @param listener the listener to attach
+     * 
+     * @param listener
+     *            the listener to attach
      */
-    public void addListener(LeaderLatchListener listener)
-    {
+    public void addListener(LeaderLatchListener listener) {
         listeners.addListener(listener);
     }
 
     /**
      * Attaches a listener to this LeaderLatch
      * <p>
-     * Attaching the same listener multiple times is a noop from the second time on.
+     * Attaching the same listener multiple times is a noop from the second time
+     * on.
      * <p>
-     * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
-     * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+     * All methods for the listener are run using the provided Executor. It is
+     * common to pass in a single-threaded executor so that you can be certain
+     * that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
-     *
-     * @param listener the listener to attach
-     * @param executor     An executor to run the methods for the listener on.
+     * 
+     * @param listener
+     *            the listener to attach
+     * @param executor
+     *            An executor to run the methods for the listener on.
      */
-    public void addListener(LeaderLatchListener listener, Executor executor)
-    {
+    public void addListener(LeaderLatchListener listener, Executor executor) {
         listeners.addListener(listener, executor);
     }
 
     /**
      * Removes a given listener from this LeaderLatch
-     *
-     * @param listener the listener to remove
+     * 
+     * @param listener
+     *            the listener to remove
      */
-    public void removeListener(LeaderLatchListener listener)
-    {
+    public void removeListener(LeaderLatchListener listener) {
         listeners.removeListener(listener);
     }
 
     /**
-     * <p>Causes the current thread to wait until this instance acquires leadership
-     * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
-     * <p>If this instance already is the leader then this method returns immediately.</p>
-     *
-     * <p>Otherwise the current
-     * thread becomes disabled for thread scheduling purposes and lies
-     * dormant until one of three things happen:</p>
+     * <p>
+     * Causes the current thread to wait until this instance acquires leadership
+     * unless the thread is {@linkplain Thread#interrupt interrupted} or
+     * {@linkplain #close() closed}.
+     * </p>
+     * <p>
+     * If this instance already is the leader then this method returns
+     * immediately.
+     * </p>
+     * 
+     * <p>
+     * Otherwise the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of three things happen:
+     * </p>
      * <ul>
      * <li>This instance becomes the leader</li>
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread</li>
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     * <p>If the current thread:</p>
+     * <p>
+     * If the current thread:
+     * </p>
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
      * </ul>
-     * <p>then {@link InterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.</p>
-     *
-     * @throws InterruptedException if the current thread is interrupted
-     *                              while waiting
-     * @throws EOFException         if the instance is {@linkplain #close() closed}
-     *                              while waiting
+     * <p>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     * </p>
+     * 
+     * @throws InterruptedException
+     *             if the current thread is interrupted while waiting
+     * @throws EOFException
+     *             if the instance is {@linkplain #close() closed} while waiting
      */
-    public void await() throws InterruptedException, EOFException
-    {
-        synchronized(this)
-        {
-            while ( (state.get() == State.STARTED) && !hasLeadership.get() )
-            {
+    public void await() throws InterruptedException, EOFException {
+        synchronized (this) {
+            while ((state.get() == State.STARTED) && !hasLeadership.get()) {
                 wait();
             }
         }
-        if ( state.get() != State.STARTED )
-        {
+        if (state.get() != State.STARTED) {
             throw new EOFException();
         }
     }
 
     /**
-     * <p>Causes the current thread to wait until this instance acquires leadership
-     * unless the thread is {@linkplain Thread#interrupt interrupted},
-     * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
-     *
-     * <p>If this instance already is the leader then this method returns immediately
-     * with the value {@code true}.</p>
-     *
-     * <p>Otherwise the current
-     * thread becomes disabled for thread scheduling purposes and lies
-     * dormant until one of four things happen:</p>
+     * <p>
+     * Causes the current thread to wait until this instance acquires leadership
+     * unless the thread is {@linkplain Thread#interrupt interrupted}, the
+     * specified waiting time elapses or the instance is {@linkplain #close()
+     * closed}.
+     * </p>
+     * 
+     * <p>
+     * If this instance already is the leader then this method returns
+     * immediately with the value {@code true}.
+     * </p>
+     * 
+     * <p>
+     * Otherwise the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of four things happen:
+     * </p>
      * <ul>
      * <li>This instance becomes the leader</li>
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread</li>
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread</li>
      * <li>The specified waiting time elapses.</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     *
-     * <p>If the current thread:</p>
+     * 
+     * <p>
+     * If the current thread:
+     * </p>
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
      * </ul>
-     * <p>then {@link InterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.</p>
-     *
-     * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
-     * then the value {@code false} is returned.  If the time is less than or equal to zero, the method
-     * will not wait at all.</p>
-     *
-     * @param timeout the maximum time to wait
-     * @param unit    the time unit of the {@code timeout} argument
-     * @return {@code true} if the count reached zero and {@code false}
-     *         if the waiting time elapsed before the count reached zero or the instances was closed
-     * @throws InterruptedException if the current thread is interrupted
-     *                              while waiting
+     * <p>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     * </p>
+     * 
+     * <p>
+     * If the specified waiting time elapses or the instance is
+     * {@linkplain #close() closed} then the value {@code false} is returned. If
+     * the time is less than or equal to zero, the method will not wait at all.
+     * </p>
+     * 
+     * @param timeout
+     *            the maximum time to wait
+     * @param unit
+     *            the time unit of the {@code timeout} argument
+     * @return {@code true} if the count reached zero and {@code false} if the
+     *         waiting time elapsed before the count reached zero or the
+     *         instances was closed
+     * @throws InterruptedException
+     *             if the current thread is interrupted while waiting
      */
-    public boolean await(long timeout, TimeUnit unit) throws InterruptedException
-    {
+    public boolean await(long timeout, TimeUnit unit)
+            throws InterruptedException {
         long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
 
-        synchronized(this)
-        {
-            while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() )
-            {
+        synchronized (this) {
+            while ((waitNanos > 0) && (state.get() == State.STARTED)
+                    && !hasLeadership.get()) {
                 long startNanos = System.nanoTime();
                 TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
                 long elapsed = System.nanoTime() - startNanos;
@@ -358,24 +392,23 @@ public class LeaderLatch implements Closeable
 
     /**
      * Return this instance's participant Id
-     *
+     * 
      * @return participant Id
      */
-    public String getId()
-    {
+    public String getId() {
         return id;
     }
 
     /**
-     * Returns this instances current state, this is the only way to verify that the object has been closed before
-     * closing again.  If you try to close a latch multiple times, the close() method will throw an
-     * IllegalArgumentException which is often not caught and ignored (CloseableUtils.closeQuietly() only looks for
-     * IOException).
-     *
+     * Returns this instances current state, this is the only way to verify that
+     * the object has been closed before closing again. If you try to close a
+     * latch multiple times, the close() method will throw an
+     * IllegalArgumentException which is often not caught and ignored
+     * (CloseableUtils.closeQuietly() only looks for IOException).
+     * 
      * @return the state of the current instance
      */
-    public State getState()
-    {
+    public State getState() {
         return state.get();
     }
 
@@ -386,16 +419,17 @@ public class LeaderLatch implements Closeable
      * <p>
      * <p>
      * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     * return a value that does not match {@link #hasLeadership()} as hasLeadership
-     * uses a local field of the class.
+     * return a value that does not match {@link #hasLeadership()} as
+     * hasLeadership uses a local field of the class.
      * </p>
-     *
+     * 
      * @return participants
-     * @throws Exception ZK errors, interruptions, etc.
+     * @throws Exception
+     *             ZK errors, interruptions, etc.
      */
-    public Collection<Participant> getParticipants() throws Exception
-    {
-        Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+    public Collection<Participant> getParticipants() throws Exception {
+        Collection<String> participantNodes = LockInternals
+                .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
         return LeaderSelector.getParticipants(client, participantNodes);
     }
 
@@ -407,26 +441,26 @@ public class LeaderLatch implements Closeable
      * <p>
      * <p>
      * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     * return a value that does not match {@link #hasLeadership()} as hasLeadership
-     * uses a local field of the class.
+     * return a value that does not match {@link #hasLeadership()} as
+     * hasLeadership uses a local field of the class.
      * </p>
-     *
+     * 
      * @return leader
-     * @throws Exception ZK errors, interruptions, etc.
+     * @throws Exception
+     *             ZK errors, interruptions, etc.
      */
-    public Participant getLeader() throws Exception
-    {
-        Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+    public Participant getLeader() throws Exception {
+        Collection<String> participantNodes = LockInternals
+                .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
         return LeaderSelector.getLeader(client, participantNodes);
     }
 
     /**
      * Return true if leadership is currently held by this instance
-     *
+     * 
      * @return true/false
      */
-    public boolean hasLeadership()
-    {
+    public boolean hasLeadership() {
         return (state.get() == State.STARTED) && hasLeadership.get();
     }
 
@@ -434,105 +468,95 @@ public class LeaderLatch implements Closeable
     volatile CountDownLatch debugResetWaitLatch = null;
 
     @VisibleForTesting
-    void reset() throws Exception
-    {
+    void reset() throws Exception {
         setLeadership(false);
         setNode(null);
 
-        BackgroundCallback callback = new BackgroundCallback()
-        {
+        BackgroundCallback callback = new BackgroundCallback() {
             @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-            {
-                if ( debugResetWaitLatch != null )
-                {
+            public void processResult(CuratorFramework client,
+                    CuratorEvent event) throws Exception {
+                if (debugResetWaitLatch != null) {
                     debugResetWaitLatch.await();
                     debugResetWaitLatch = null;
                 }
 
-                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-                {
+                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                     setNode(event.getName());
-                    if ( state.get() == State.CLOSED )
-                    {
+                    if (state.get() == State.CLOSED) {
                         setNode(null);
-                    }
-                    else
-                    {
+                    } else {
                         getChildren();
                     }
-                }
-                else
-                {
-                    log.error("getChildren() failed. rc = " + event.getResultCode());
+                } else {
+                    log.error("getChildren() failed. rc = "
+                            + event.getResultCode());
                 }
             }
         };
-        client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
+        client.create()
+                .creatingParentsIfNeeded()
+                .withProtection()
+                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+                .inBackground(callback)
+                .forPath(ZKPaths.makePath(latchPath, LOCK_NAME),
+                        LeaderSelector.getIdBytes(id));
     }
 
-    private void checkLeadership(List<String> children) throws Exception
-    {
+    private void checkLeadership(List<String> children) throws Exception {
         final String localOurPath = ourPath.get();
-        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
-        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
-        if ( ourIndex < 0 )
-        {
+        List<String> sortedChildren = LockInternals.getSortedChildren(
+                LOCK_NAME, sorter, children);
+        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths
+                .getNodeFromPath(localOurPath)) : -1;
+        if (ourIndex < 0) {
             log.error("Can't find our node. Resetting. Index: " + ourIndex);
             reset();
-        }
-        else if ( ourIndex == 0 )
-        {
+        } else if (ourIndex == 0) {
             setLeadership(true);
-        }
-        else
-        {
+        } else {
             String watchPath = sortedChildren.get(ourIndex - 1);
-            Watcher watcher = new Watcher()
-            {
+            Watcher watcher = new Watcher() {
                 @Override
-                public void process(WatchedEvent event)
-                {
-                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
-                    {
-                        try
-                        {
+                public void process(WatchedEvent event) {
+                    if ((state.get() == State.STARTED)
+                            && (event.getType() == Event.EventType.NodeDeleted)
+                            && (localOurPath != null)) {
+                        try {
                             getChildren();
-                        }
-                        catch ( Exception ex )
-                        {
-                            log.error("An error occurred checking the leadership.", ex);
+                        } catch (Exception ex) {
+                            log.error(
+                                    "An error occurred checking the leadership.",
+                                    ex);
                         }
                     }
                 }
             };
 
-            BackgroundCallback callback = new BackgroundCallback()
-            {
+            BackgroundCallback callback = new BackgroundCallback() {
                 @Override
-                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-                {
-                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
-                    {
+                public void processResult(CuratorFramework client,
+                        CuratorEvent event) throws Exception {
+                    if (event.getResultCode() == KeeperException.Code.NONODE
+                            .intValue()) {
                         // previous node is gone - reset
                         reset();
                     }
                 }
             };
-            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
-            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
+            // use getData() instead of exists() to avoid leaving unneeded
+            // watchers which is a type of resource leak
+            client.getData().usingWatcher(watcher).inBackground(callback)
+                    .forPath(ZKPaths.makePath(latchPath, watchPath));
         }
     }
 
-    private void getChildren() throws Exception
-    {
-        BackgroundCallback callback = new BackgroundCallback()
-        {
+    private void getChildren() throws Exception {
+        BackgroundCallback callback = new BackgroundCallback() {
             @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-            {
-                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-                {
+            public void processResult(CuratorFramework client,
+                    CuratorEvent event) throws Exception {
+                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                     checkLeadership(event.getChildren());
                 }
             }
@@ -540,82 +564,47 @@ public class LeaderLatch implements Closeable
         client.getChildren().inBackground(callback).forPath(latchPath);
     }
 
-    private void handleStateChange(ConnectionState newState)
-    {
-        switch ( newState )
-        {
-        default:
-        {
-            // NOP
-            break;
-        }
-
-        case RECONNECTED:
-        {
-            try
-            {
+    private void handleStateChange(ConnectionState newState) {
+        if (newState.isConnected()) {
+            try {
                 reset();
-            }
-            catch ( Exception e )
-            {
+            } catch (Exception e) {
                 log.error("Could not reset leader latch", e);
                 setLeadership(false);
             }
-            break;
-        }
-
-        case SUSPENDED:
-        case LOST:
-        {
+        } else {
             setLeadership(false);
-            break;
-        }
         }
     }
 
-    private synchronized void setLeadership(boolean newValue)
-    {
+    private synchronized void setLeadership(boolean newValue) {
         boolean oldValue = hasLeadership.getAndSet(newValue);
 
-        if ( oldValue && !newValue )
-        { // Lost leadership, was true, now false
-            listeners.forEach
-            (
-                new Function<LeaderLatchListener, Void>()
-                {
-                    @Override
-                    public Void apply(LeaderLatchListener listener)
-                    {
-                        listener.notLeader();
-                        return null;
-                    }
+        if (oldValue && !newValue) { // Lost leadership, was true, now false
+            listeners.forEach(new Function<LeaderLatchListener, Void>() {
+                @Override
+                public Void apply(LeaderLatchListener listener) {
+                    listener.notLeader();
+                    return null;
                 }
-            );
-        }
-        else if ( !oldValue && newValue )
-        { // Gained leadership, was false, now true
-            listeners.forEach
-            (
-                new Function<LeaderLatchListener, Void>()
-                {
-                    @Override
-                    public Void apply(LeaderLatchListener input)
-                    {
-                        input.isLeader();
-                        return null;
-                    }
+            });
+        } else if (!oldValue && newValue) { // Gained leadership, was false, now
+                                            // true
+            listeners.forEach(new Function<LeaderLatchListener, Void>() {
+                @Override
+                public Void apply(LeaderLatchListener input) {
+                    input.isLeader();
+                    return null;
                 }
-            );
+            });
         }
 
         notifyAll();
     }
 
-    private void setNode(String newValue) throws Exception
-    {
+    private void setNode(String newValue) throws Exception {
         String oldPath = ourPath.getAndSet(newValue);
-        if ( oldPath != null )
-        {
+        if (oldPath != null) {
             client.delete().guaranteed().inBackground().forPath(oldPath);
         }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b827e15..108f118 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -19,9 +19,17 @@
 
 package org.apache.curator.framework.recipes.leader;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
@@ -33,36 +41,29 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
-public class TestLeaderLatch extends BaseClassForTests
-{
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TestLeaderLatch extends BaseClassForTests {
     private static final String PATH_NAME = "/one/two/me";
     private static final int MAX_LOOPS = 5;
 
     @Test
-    public void testResetRace() throws Exception
-    {
+    public void testResetRace() throws Exception {
         Timing timing = new Timing();
         LeaderLatch latch = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
             latch = new LeaderLatch(client, PATH_NAME);
 
             latch.debugResetWaitLatch = new CountDownLatch(1);
-            latch.start();  // will call reset()
-            latch.reset();  // should not result in two nodes
+            latch.start(); // will call reset()
+            latch.reset(); // should not result in two nodes
 
             timing.sleepABit();
 
@@ -70,22 +71,21 @@ public class TestLeaderLatch extends BaseClassForTests
 
             timing.sleepABit();
 
-            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 1);
-        }
-        finally
-        {
+            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
+                    1);
+        } finally {
             CloseableUtils.closeQuietly(latch);
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testCreateDeleteRace() throws Exception
-    {
+    public void testCreateDeleteRace() throws Exception {
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
             LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
 
@@ -100,43 +100,40 @@ public class TestLeaderLatch extends BaseClassForTests
 
             timing.sleepABit();
 
-            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 0);
+            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
+                    0);
 
-        }
-        finally
-        {
+        } finally {
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testLostConnection() throws Exception
-    {
+    public void testLostConnection() throws Exception {
         final int PARTICIPANT_QTY = 10;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
 
             final CountDownLatch countDownLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-            {
-                @Override
-                public void stateChanged(CuratorFramework client, ConnectionState newState)
-                {
-                    if ( newState == ConnectionState.LOST )
-                    {
-                        countDownLatch.countDown();
-                    }
-                }
-            });
+            client.getConnectionStateListenable().addListener(
+                    new ConnectionStateListener() {
+                        @Override
+                        public void stateChanged(CuratorFramework client,
+                                ConnectionState newState) {
+                            if (newState == ConnectionState.LOST) {
+                                countDownLatch.countDown();
+                            }
+                        }
+                    });
 
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
                 latch.start();
                 latches.add(latch);
@@ -151,13 +148,12 @@ public class TestLeaderLatch extends BaseClassForTests
 
             Assert.assertEquals(getLeaders(latches).size(), 0);
 
-            server = new TestingServer(server.getPort(), server.getTempDirectory());
-            Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect
-        }
-        finally
-        {
-            for ( LeaderLatch latch : latches )
-            {
+            server = new TestingServer(server.getPort(),
+                    server.getTempDirectory());
+            Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should
+                                                                            // reconnect
+        } finally {
+            for (LeaderLatch latch : latches) {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
@@ -165,21 +161,20 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
-    public void testCorrectWatching() throws Exception
-    {
+    public void testCorrectWatching() throws Exception {
         final int PARTICIPANT_QTY = 10;
         final int PARTICIPANT_ID = 2;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
 
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
                 latch.start();
                 latches.add(latch);
@@ -187,20 +182,20 @@ public class TestLeaderLatch extends BaseClassForTests
 
             waitForALeader(latches, timing);
 
-            //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
+            // we need to close a Participant that doesn't be actual leader
+            // (first Participant) nor the last
             latches.get(PARTICIPANT_ID).close();
 
-            //As the previous algorithm assumed that if the watched node is deleted gets the leadership
-            //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
+            // As the previous algorithm assumed that if the watched node is
+            // deleted gets the leadership
+            // we need to ensure that the PARTICIPANT_ID-1 is not getting
+            // (wrongly) elected as leader.
             Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
-        }
-        finally
-        {
-            //removes the already closed participant
+        } finally {
+            // removes the already closed participant
             latches.remove(PARTICIPANT_ID);
 
-            for ( LeaderLatch latch : latches )
-            {
+            for (LeaderLatch latch : latches) {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
@@ -209,37 +204,35 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
-    public void testWaiting() throws Exception
-    {
+    public void testWaiting() throws Exception {
         final int PARTICIPANT_QTY = 10;
 
-        ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
-        ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(executorService);
+        ExecutorService executorService = Executors
+                .newFixedThreadPool(PARTICIPANT_QTY);
+        ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(
+                executorService);
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
 
             final AtomicBoolean thereIsALeader = new AtomicBoolean(false);
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
-                service.submit(new Callable<Void>()
-                {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+                service.submit(new Callable<Void>() {
                     @Override
-                    public Void call() throws Exception
-                    {
+                    public Void call() throws Exception {
                         LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-                        try
-                        {
+                        try {
                             latch.start();
-                            Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
-                            Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
-                            Thread.sleep((int)(10 * Math.random()));
-                        }
-                        finally
-                        {
+                            Assert.assertTrue(latch.await(timing.forWaiting()
+                                    .seconds(), TimeUnit.SECONDS));
+                            Assert.assertTrue(thereIsALeader.compareAndSet(
+                                    false, true));
+                            Thread.sleep((int) (10 * Math.random()));
+                        } finally {
                             thereIsALeader.set(false);
                             latch.close();
                         }
@@ -248,68 +241,58 @@ public class TestLeaderLatch extends BaseClassForTests
                 });
             }
 
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
                 service.take().get();
             }
-        }
-        finally
-        {
+        } finally {
             executorService.shutdown();
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testBasic() throws Exception
-    {
+    public void testBasic() throws Exception {
         basic(Mode.START_IMMEDIATELY);
     }
 
     @Test
-    public void testBasicAlt() throws Exception
-    {
+    public void testBasicAlt() throws Exception {
         basic(Mode.START_IN_THREADS);
     }
 
     @Test
-    public void testCallbackSanity() throws Exception
-    {
+    public void testCallbackSanity() throws Exception {
         final int PARTICIPANT_QTY = 10;
         final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
         final AtomicLong masterCounter = new AtomicLong(0);
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors
+                .newSingleThreadExecutor(new ThreadFactoryBuilder()
+                        .setDaemon(true).setNameFormat("callbackSanity-%s")
+                        .build());
 
         List<LeaderLatch> latches = Lists.newArrayList();
-        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-        {
+        for (int i = 0; i < PARTICIPANT_QTY; ++i) {
             final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-            latch.addListener(new LeaderLatchListener()
-            {
+            latch.addListener(new LeaderLatchListener() {
                 boolean beenLeader = false;
 
                 @Override
-                public void isLeader()
-                {
-                    if ( !beenLeader )
-                    {
+                public void isLeader() {
+                    if (!beenLeader) {
                         masterCounter.incrementAndGet();
                         beenLeader = true;
-                        try
-                        {
+                        try {
                             latch.reset();
-                        }
-                        catch ( Exception e )
-                        {
+                        } catch (Exception e) {
                             throw Throwables.propagate(e);
                         }
-                    }
-                    else
-                    {
+                    } else {
                         masterCounter.incrementAndGet();
                         CloseableUtils.closeQuietly(latch);
                         timesSquare.countDown();
@@ -317,20 +300,17 @@ public class TestLeaderLatch extends BaseClassForTests
                 }
 
                 @Override
-                public void notLeader()
-                {
+                public void notLeader() {
                     notLeaderCounter.incrementAndGet();
                 }
             }, exec);
             latches.add(latch);
         }
 
-        try
-        {
+        try {
             client.start();
 
-            for ( LeaderLatch latch : latches )
-            {
+            for (LeaderLatch latch : latches) {
                 latch.start();
             }
 
@@ -338,17 +318,12 @@ public class TestLeaderLatch extends BaseClassForTests
 
             Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
             Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY);
-            for ( LeaderLatch latch : latches )
-            {
+            for (LeaderLatch latch : latches) {
                 Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
             }
-        }
-        finally
-        {
-            for ( LeaderLatch latch : latches )
-            {
-                if ( latch.getState() != LeaderLatch.State.CLOSED )
-                {
+        } finally {
+            for (LeaderLatch latch : latches) {
+                if (latch.getState() != LeaderLatch.State.CLOSED) {
                     CloseableUtils.closeQuietly(latch);
                 }
             }
@@ -357,8 +332,7 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
-    public void testCallbackNotifyLeader() throws Exception
-    {
+    public void testCallbackNotifyLeader() throws Exception {
         final int PARTICIPANT_QTY = 10;
         final int SILENT_QTY = 3;
 
@@ -367,37 +341,35 @@ public class TestLeaderLatch extends BaseClassForTests
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors
+                .newSingleThreadExecutor(new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("callbackNotifyLeader-%s").build());
 
         List<LeaderLatch> latches = Lists.newArrayList();
-        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-        {
-            LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT : LeaderLatch.CloseMode.NOTIFY_LEADER;
+        for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+            LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT
+                    : LeaderLatch.CloseMode.NOTIFY_LEADER;
 
-            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
-            latch.addListener(new LeaderLatchListener()
-            {
+            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "",
+                    closeMode);
+            latch.addListener(new LeaderLatchListener() {
                 boolean beenLeader = false;
 
                 @Override
-                public void isLeader()
-                {
-                    if ( !beenLeader )
-                    {
+                public void isLeader() {
+                    if (!beenLeader) {
                         masterCounter.incrementAndGet();
                         beenLeader = true;
-                        try
-                        {
+                        try {
                             latch.reset();
-                        }
-                        catch ( Exception e )
-                        {
+                        } catch (Exception e) {
                             throw Throwables.propagate(e);
                         }
-                    }
-                    else
-                    {
+                    } else {
                         masterCounter.incrementAndGet();
                         CloseableUtils.closeQuietly(latch);
                         timesSquare.countDown();
@@ -405,38 +377,31 @@ public class TestLeaderLatch extends BaseClassForTests
                 }
 
                 @Override
-                public void notLeader()
-                {
+                public void notLeader() {
                     notLeaderCounter.incrementAndGet();
                 }
             }, exec);
             latches.add(latch);
         }
 
-        try
-        {
+        try {
             client.start();
 
-            for ( LeaderLatch latch : latches )
-            {
+            for (LeaderLatch latch : latches) {
                 latch.start();
             }
 
             timesSquare.await();
 
             Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
-            Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
-            for ( LeaderLatch latch : latches )
-            {
+            Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2
+                    - SILENT_QTY);
+            for (LeaderLatch latch : latches) {
                 Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
             }
-        }
-        finally
-        {
-            for ( LeaderLatch latch : latches )
-            {
-                if ( latch.getState() != LeaderLatch.State.CLOSED )
-                {
+        } finally {
+            for (LeaderLatch latch : latches) {
+                if (latch.getState() != LeaderLatch.State.CLOSED) {
                     CloseableUtils.closeQuietly(latch);
                 }
             }
@@ -445,47 +410,42 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
-    public void testCallbackDontNotify() throws Exception
-    {
+    public void testCallbackDontNotify() throws Exception {
         final AtomicLong masterCounter = new AtomicLong(0);
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
 
         final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
-        final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+        final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME,
+                "", LeaderLatch.CloseMode.NOTIFY_LEADER);
 
-        leader.addListener(new LeaderLatchListener()
-        {
+        leader.addListener(new LeaderLatchListener() {
             @Override
-            public void isLeader()
-            {
+            public void isLeader() {
             }
 
             @Override
-            public void notLeader()
-            {
+            public void notLeader() {
                 masterCounter.incrementAndGet();
             }
         });
 
-        notifiedLeader.addListener(new LeaderLatchListener()
-        {
+        notifiedLeader.addListener(new LeaderLatchListener() {
             @Override
-            public void isLeader()
-            {
+            public void isLeader() {
             }
 
             @Override
-            public void notLeader()
-            {
+            public void notLeader() {
                 notLeaderCounter.incrementAndGet();
             }
         });
 
-        try
-        {
+        try {
             client.start();
 
             leader.start();
@@ -504,63 +464,116 @@ public class TestLeaderLatch extends BaseClassForTests
             leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
 
             Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
-            Assert.assertEquals(notifiedLeader.getState(), LeaderLatch.State.CLOSED);
+            Assert.assertEquals(notifiedLeader.getState(),
+                    LeaderLatch.State.CLOSED);
 
             Assert.assertEquals(masterCounter.get(), 1);
             Assert.assertEquals(notLeaderCounter.get(), 0);
-        }
-        finally
-        {
-            if ( leader.getState() != LeaderLatch.State.CLOSED )
-            {
+        } finally {
+            if (leader.getState() != LeaderLatch.State.CLOSED) {
                 CloseableUtils.closeQuietly(leader);
             }
-            if ( notifiedLeader.getState() != LeaderLatch.State.CLOSED )
-            {
+            if (notifiedLeader.getState() != LeaderLatch.State.CLOSED) {
                 CloseableUtils.closeQuietly(notifiedLeader);
             }
             CloseableUtils.closeQuietly(client);
         }
     }
 
-    private enum Mode
-    {
-        START_IMMEDIATELY,
-        START_IN_THREADS
+    @Test
+    public void testNoServerAtStart() {
+        CloseableUtils.closeQuietly(server);
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+
+        client.start();
+
+        final CountDownLatch sessionLostCount = new CountDownLatch(1);
+
+        // Need to ensure that we've actually lost the connection completely
+        // before starting. Otherwise it's possible that the server will start
+        // during retries of the inital commands to create the latch zNodes
+        client.getConnectionStateListenable().addListener(
+                new ConnectionStateListener() {
+
+                    @Override
+                    public void stateChanged(CuratorFramework client,
+                            ConnectionState newState) {
+                        if (newState == ConnectionState.LOST) {
+                            sessionLostCount.countDown();
+                        }
+                    }
+                });
+
+        final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
+        final CountDownLatch leaderCounter = new CountDownLatch(1);
+        leader.addListener(new LeaderLatchListener() {
+
+            @Override
+            public void isLeader() {
+                leaderCounter.countDown();
+            }
+
+            @Override
+            public void notLeader() {
+            }
+
+        });
+
+        try {
+            leader.start();
+
+            timing.awaitLatch(sessionLostCount);
+
+            // Start the new server
+            server = new TestingServer(server.getPort());
+
+            timing.awaitLatch(leaderCounter);
+
+        } catch (Exception e) {
+            Assert.fail("Unexpected exception", e);
+        } finally {
+            CloseableUtils.closeQuietly(leader);
+            CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(server);
+        }
+
+    }
+
+    private enum Mode {
+        START_IMMEDIATELY, START_IN_THREADS
     }
 
-    private void basic(Mode mode) throws Exception
-    {
-        final int PARTICIPANT_QTY = 1;//0;
+    private void basic(Mode mode) throws Exception {
+        final int PARTICIPANT_QTY = 1;// 0;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
 
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-                if ( mode == Mode.START_IMMEDIATELY )
-                {
+                if (mode == Mode.START_IMMEDIATELY) {
                     latch.start();
                 }
                 latches.add(latch);
             }
-            if ( mode == Mode.START_IN_THREADS )
-            {
-                ExecutorService service = Executors.newFixedThreadPool(latches.size());
-                for ( final LeaderLatch latch : latches )
-                {
-                    service.submit(new Callable<Object>()
-                    {
+            if (mode == Mode.START_IN_THREADS) {
+                ExecutorService service = Executors.newFixedThreadPool(latches
+                        .size());
+                for (final LeaderLatch latch : latches) {
+                    service.submit(new Callable<Object>() {
                         @Override
-                        public Object call() throws Exception
-                        {
-                            Thread.sleep((int)(100 * Math.random()));
+                        public Object call() throws Exception {
+                            Thread.sleep((int) (100 * Math.random()));
                             latch.start();
                             return null;
                         }
@@ -569,36 +582,38 @@ public class TestLeaderLatch extends BaseClassForTests
                 service.shutdown();
             }
 
-            while ( latches.size() > 0 )
-            {
+            while (latches.size() > 0) {
                 List<LeaderLatch> leaders = waitForALeader(latches, timing);
-                Assert.assertEquals(leaders.size(), 1); // there can only be one leader
+                Assert.assertEquals(leaders.size(), 1); // there can only be one
+                                                        // leader
                 LeaderLatch theLeader = leaders.get(0);
-                if ( mode == Mode.START_IMMEDIATELY )
-                {
-                    Assert.assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
+                if (mode == Mode.START_IMMEDIATELY) {
+                    Assert.assertEquals(latches.indexOf(theLeader), 0); // assert
+                                                                        // ordering
+                                                                        // -
+                                                                        // leadership
+                                                                        // should
+                                                                        // advance
+                                                                        // in
+                                                                        // start
+                                                                        // order
                 }
                 theLeader.close();
                 latches.remove(theLeader);
             }
-        }
-        finally
-        {
-            for ( LeaderLatch latch : latches )
-            {
+        } finally {
+            for (LeaderLatch latch : latches) {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
         }
     }
 
-    private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
-    {
-        for ( int i = 0; i < MAX_LOOPS; ++i )
-        {
+    private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches,
+            Timing timing) throws InterruptedException {
+        for (int i = 0; i < MAX_LOOPS; ++i) {
             List<LeaderLatch> leaders = getLeaders(latches);
-            if ( leaders.size() != 0 )
-            {
+            if (leaders.size() != 0) {
                 return leaders;
             }
             timing.sleepABit();
@@ -606,13 +621,10 @@ public class TestLeaderLatch extends BaseClassForTests
         return Lists.newArrayList();
     }
 
-    private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches)
-    {
+    private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches) {
         List<LeaderLatch> leaders = Lists.newArrayList();
-        for ( LeaderLatch latch : latches )
-        {
-            if ( latch.hasLeadership() )
-            {
+        for (LeaderLatch latch : latches) {
+            if (latch.hasLeadership()) {
                 leaders.add(latch);
             }
         }


Mime
View raw message