curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [3/3] git commit: CURATOR-22 Add a listener to LeaderLatch
Date Thu, 09 May 2013 23:47:16 GMT
CURATOR-22
Add a listener to LeaderLatch


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/3d6181ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/3d6181ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/3d6181ca

Branch: refs/heads/CURATOR-22
Commit: 3d6181cae984807bd2f89cecc2e5b55d0574a5b3
Parents: 9acf592
Author: randgalt <randgalt@apache.org>
Authored: Thu May 9 16:46:50 2013 -0700
Committer: randgalt <randgalt@apache.org>
Committed: Thu May 9 16:46:50 2013 -0700

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java      |  244 ++++++++++-----
 .../recipes/leader/LeaderLatchListener.java        |   46 +++
 .../framework/recipes/leader/TestLeaderLatch.java  |  181 ++++++++---
 3 files changed, 351 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 36a0636..508ca7c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -16,13 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -41,29 +44,31 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>
- *     Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected
to
- *     a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
- *     randomly be assigned leader until it releases leadership at which time another one
from the
- *     group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected
to
+ * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
+ * randomly be assigned leader until it releases leadership at which time another one from
the
+ * group will randomly be chosen
  * </p>
  */
 public class LeaderLatch implements Closeable
 {
-    private final Logger                                log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework                      client;
-    private final String                                latchPath;
-    private final String                                id;
-    private final AtomicReference<State>                state = new AtomicReference<State>(State.LATENT);
-    private final AtomicBoolean                         hasLeadership = new AtomicBoolean(false);
-    private final AtomicReference<String>               ourPath = new AtomicReference<String>();
-
-    private final ConnectionStateListener               listener = new ConnectionStateListener()
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final String latchPath;
+    private final String id;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
+    private final AtomicReference<String> ourPath = new AtomicReference<String>();
+    private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+
+    private final ConnectionStateListener listener = new ConnectionStateListener()
     {
         @Override
         public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -74,7 +79,7 @@ public class LeaderLatch implements Closeable
 
     private static final String LOCK_NAME = "latch-";
 
-    private static final LockInternalsSorter        sorter = new LockInternalsSorter()
+    private static final LockInternalsSorter sorter = new LockInternalsSorter()
     {
         @Override
         public String fixForSorting(String str, String lockName)
@@ -83,7 +88,7 @@ public class LeaderLatch implements Closeable
         }
     };
 
-    private enum State
+    public enum State
     {
         LATENT,
         STARTED,
@@ -91,7 +96,7 @@ public class LeaderLatch implements Closeable
     }
 
     /**
-     * @param client the client
+     * @param client    the client
      * @param latchPath the path for this leadership group
      */
     public LeaderLatch(CuratorFramework client, String latchPath)
@@ -100,9 +105,9 @@ public class LeaderLatch implements Closeable
     }
 
     /**
-     * @param client the client
+     * @param client    the client
      * @param latchPath the path for this leadership group
-     * @param id participant ID
+     * @param id        participant ID
      */
     public LeaderLatch(CuratorFramework client, String latchPath, String id)
     {
@@ -147,16 +152,60 @@ public class LeaderLatch implements Closeable
         finally
         {
             client.getConnectionStateListenable().removeListener(listener);
+            listeners.clear();
             setLeadership(false);
         }
     }
 
     /**
+     * Attaches a listener to this LeaderLatch
+     * <p/>
+     * Attaching the same listener multiple times is a noop from the second time on.
+     * <p/>
+     * All methods for the listener are run using the provided Executor.  It is common to
pass in a single-threaded
+     * executor so that you can be certain that listener methods are called in sequence,
but if you are fine with
+     * them being called out of order you are welcome to use multiple threads.
+     *
+     * @param listener the listener to attach
+     */
+    public void addListener(LeaderLatchListener listener)
+    {
+        listeners.addListener(listener);
+    }
+
+    /**
+     * Attaches a listener to this LeaderLatch
+     * <p/>
+     * Attaching the same listener multiple times is a noop from the second time on.
+     * <p/>
+     * All methods for the listener are run using the provided Executor.  It is common to
pass in a single-threaded
+     * executor so that you can be certain that listener methods are called in sequence,
but if you are fine with
+     * them being called out of order you are welcome to use multiple threads.
+     *
+     * @param listener the listener to attach
+     * @param executor     An executor to run the methods for the listener on.
+     */
+    public void addListener(LeaderLatchListener listener, Executor executor)
+    {
+        listeners.addListener(listener, executor);
+    }
+
+    /**
+     * Removes a given listener from this LeaderLatch
+     *
+     * @param listener the listener to remove
+     */
+    public void removeListener(LeaderLatchListener listener)
+    {
+        listeners.removeListener(listener);
+    }
+
+    /**
      * <p>Causes the current thread to wait until this instance acquires leadership
      * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close()
closed}.</p>
-     *
+     * <p/>
      * <p>If this instance already is the leader then this method returns immediately.</p>
-     *
+     * <p/>
      * <p>Otherwise the current
      * thread becomes disabled for thread scheduling purposes and lies
      * dormant until one of three things happen:
@@ -166,7 +215,7 @@ public class LeaderLatch implements Closeable
      * the current thread</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul></p>
-     *
+     * <p/>
      * <p>If the current thread:
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
@@ -176,9 +225,9 @@ public class LeaderLatch implements Closeable
      * interrupted status is cleared.</p>
      *
      * @throws InterruptedException if the current thread is interrupted
-     *         while waiting
-     * @throws EOFException if the instance is {@linkplain #close() closed}
-     *         while waiting
+     *                              while waiting
+     * @throws EOFException         if the instance is {@linkplain #close() closed}
+     *                              while waiting
      */
     public void await() throws InterruptedException, EOFException
     {
@@ -199,10 +248,10 @@ public class LeaderLatch implements Closeable
      * <p>Causes the current thread to wait until this instance acquires leadership
      * unless the thread is {@linkplain Thread#interrupt interrupted},
      * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
-     *
+     * <p/>
      * <p>If this instance already is the leader then this method returns immediately
      * with the value {@code true}.</p>
-     *
+     * <p/>
      * <p>Otherwise the current
      * thread becomes disabled for thread scheduling purposes and lies
      * dormant until one of four things happen:
@@ -213,7 +262,7 @@ public class LeaderLatch implements Closeable
      * <li>The specified waiting time elapses.</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     *
+     * <p/>
      * <p>If the current thread:
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
@@ -221,29 +270,29 @@ public class LeaderLatch implements Closeable
      * </ul>
      * then {@link InterruptedException} is thrown and the current thread's
      * interrupted status is cleared.</p>
-     *
+     * <p/>
      * <p>If the specified waiting time elapses or the instance is {@linkplain #close()
closed}
      * then the value {@code false} is returned.  If the time is less than or equal to zero,
the method
      * will not wait at all.</p>
      *
      * @param timeout the maximum time to wait
-     * @param unit the time unit of the {@code timeout} argument
+     * @param unit    the time unit of the {@code timeout} argument
      * @return {@code true} if the count reached zero and {@code false}
      *         if the waiting time elapsed before the count reached zero or the instances
was closed
      * @throws InterruptedException if the current thread is interrupted
-     *         while waiting
+     *                              while waiting
      */
     public boolean await(long timeout, TimeUnit unit) throws InterruptedException
     {
-        long        waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
+        long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
 
         synchronized(this)
         {
             while ( (waitNanos > 0) && (state.get() == State.STARTED) &&
!hasLeadership.get() )
             {
-                long        startNanos = System.nanoTime();
+                long startNanos = System.nanoTime();
                 TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
-                long        elapsed = System.nanoTime() - startNanos;
+                long elapsed = System.nanoTime() - startNanos;
                 waitNanos -= elapsed;
             }
         }
@@ -261,14 +310,27 @@ public class LeaderLatch implements Closeable
     }
 
     /**
+     * Returns this instances current state, this is the only way to verify that the object
has been closed before
+     * closing again.  If you try to close a latch multiple times, the close() method will
throw an
+     * IllegalArgumentException which is often not caught and ignored (Closeables.closeQuietly()
only looks for
+     * IOException).
+     *
+     * @return the state of the current instance
+     */
+    public State getState()
+    {
+        return state.get();
+    }
+
+    /**
      * <p>
-     *     Returns the set of current participants in the leader selection
+     * Returns the set of current participants in the leader selection
      * </p>
-     *
+     * <p/>
      * <p>
-     *     <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     *     return a value that does not match {@link #hasLeadership()} as hasLeadership
-     *     uses a local field of the class.
+     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
+     * return a value that does not match {@link #hasLeadership()} as hasLeadership
+     * uses a local field of the class.
      * </p>
      *
      * @return participants
@@ -282,20 +344,20 @@ public class LeaderLatch implements Closeable
 
     /**
      * <p>
-     *     Return the id for the current leader. If for some reason there is no
-     *     current leader, a dummy participant is returned.
+     * Return the id for the current leader. If for some reason there is no
+     * current leader, a dummy participant is returned.
      * </p>
-     *
+     * <p/>
      * <p>
-     *     <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     *     return a value that does not match {@link #hasLeadership()} as hasLeadership
-     *     uses a local field of the class.
+     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
+     * return a value that does not match {@link #hasLeadership()} as hasLeadership
+     * uses a local field of the class.
      * </p>
      *
      * @return leader
      * @throws Exception ZK errors, interruptions, etc.
      */
-    public Participant      getLeader() throws Exception
+    public Participant getLeader() throws Exception
     {
         Collection<String> participantNodes = LockInternals.getParticipantNodes(client,
latchPath, LOCK_NAME, sorter);
         return LeaderSelector.getLeader(client, participantNodes);
@@ -320,7 +382,7 @@ public class LeaderLatch implements Closeable
         setLeadership(false);
         setNode(null);
 
-        BackgroundCallback          callback = new BackgroundCallback()
+        BackgroundCallback callback = new BackgroundCallback()
         {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
@@ -347,9 +409,9 @@ public class LeaderLatch implements Closeable
 
     private void checkLeadership(List<String> children) throws Exception
     {
-        final String    localOurPath = ourPath.get();
-        List<String>    sortedChildren = LockInternals.getSortedChildren(LOCK_NAME,
sorter, children);
-        int             ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath))
: -1;
+        final String localOurPath = ourPath.get();
+        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter,
children);
+        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath))
: -1;
         if ( ourIndex < 0 )
         {
             log.error("Can't find our node. Resetting. Index: " + ourIndex);
@@ -361,7 +423,7 @@ public class LeaderLatch implements Closeable
         }
         else
         {
-            String          watchPath = sortedChildren.get(ourIndex - 1);
+            String watchPath = sortedChildren.get(ourIndex - 1);
             Watcher watcher = new Watcher()
             {
                 @Override
@@ -373,7 +435,7 @@ public class LeaderLatch implements Closeable
                         {
                             getChildren();
                         }
-                        catch(Exception ex)
+                        catch ( Exception ex )
                         {
                             log.error("An error occurred checking the leadership.", ex);
                         }
@@ -381,7 +443,7 @@ public class LeaderLatch implements Closeable
                 }
             };
 
-            BackgroundCallback          callback = new BackgroundCallback()
+            BackgroundCallback callback = new BackgroundCallback()
             {
                 @Override
                 public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
@@ -399,7 +461,7 @@ public class LeaderLatch implements Closeable
 
     private void getChildren() throws Exception
     {
-        BackgroundCallback          callback = new BackgroundCallback()
+        BackgroundCallback callback = new BackgroundCallback()
         {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
@@ -417,44 +479,76 @@ public class LeaderLatch implements Closeable
     {
         switch ( newState )
         {
-            default:
-            {
-                // NOP
-                break;
-            }
+        default:
+        {
+            // NOP
+            break;
+        }
 
-            case RECONNECTED:
+        case RECONNECTED:
+        {
+            try
             {
-                try
-                {
-                    reset();
-                }
-                catch ( Exception e )
-                {
-                    log.error("Could not reset leader latch", e);
-                    setLeadership(false);
-                }
-                break;
+                reset();
             }
-
-            case SUSPENDED:
-            case LOST:
+            catch ( Exception e )
             {
+                log.error("Could not reset leader latch", e);
                 setLeadership(false);
-                break;
             }
+            break;
+        }
+
+        case SUSPENDED:
+        case LOST:
+        {
+            setLeadership(false);
+            break;
+        }
         }
     }
 
     private synchronized void setLeadership(boolean newValue)
     {
-        hasLeadership.set(newValue);
+        boolean oldValue = hasLeadership.getAndSet(newValue);
+
+        if ( oldValue && !newValue )
+        { // Lost leadership, was true, now false
+            listeners.forEach
+            (
+                new Function<LeaderLatchListener, Void>()
+                {
+                    @Override
+                    public Void apply(LeaderLatchListener listener)
+                    {
+                        listener.notLeader();
+                        return null;
+                    }
+                }
+            );
+        }
+        else if ( !oldValue && newValue )
+        { // Gained leadership, was false, now true
+            listeners.forEach
+            (
+                new Function<LeaderLatchListener, Void>()
+                {
+                    @Override
+                    public Void apply(LeaderLatchListener input)
+                    {
+                        input.isLeader();
+                        return null;
+                    }
+                }
+            );
+        }
+
         notifyAll();
     }
 
     private void setNode(String newValue) throws Exception
     {
-        String      oldPath = ourPath.getAndSet(newValue);
+        String oldPath = ourPath.getAndSet(newValue);
         if ( oldPath != null )
         {
             client.delete().guaranteed().inBackground().forPath(oldPath);

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
new file mode 100644
index 0000000..68dd355
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.leader;
+
+/**
+ * A LeaderLatchListener can be used to be notified asynchronously about when the state of
the LeaderLatch has changed.
+ *
+ * Note that just because you are in the middle of one of these method calls, it does not
necessarily mean that
+ * hasLeadership() is the corresponding true/false value.  It is possible for the state to
change behind the scenes
+ * before these methods get called.  The contract is that if that happens, you should see
another call to the other
+ * method pretty quickly.
+ */
+public interface LeaderLatchListener
+{
+  /**
+   * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership
= true.
+   *
+   * Note that it is possible that by the time this method call happens, hasLeadership has
fallen back to false.  If
+   * this occurs, you can expect {@link #notLeader()} to also be called.
+   */
+  public void isLeader();
+
+  /**
+   * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership
= false.
+   *
+   * Note that it is possible that by the time this method call happens, hasLeadership has
become true.  If
+   * this occurs, you can expect {@link #isLeader()} to also be called.
+   */
+  public void notLeader();
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 307c383..58a61ee 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -16,10 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.leader;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -39,6 +42,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class TestLeaderLatch extends BaseClassForTests
 {
@@ -88,7 +92,7 @@ public class TestLeaderLatch extends BaseClassForTests
         {
             client.start();
 
-            final CountDownLatch        countDownLatch = new CountDownLatch(1);
+            final CountDownLatch countDownLatch = new CountDownLatch(1);
             client.getConnectionStateListenable().addListener
             (
                 new ConnectionStateListener()
@@ -136,47 +140,47 @@ public class TestLeaderLatch extends BaseClassForTests
     @Test
     public void testCorrectWatching() throws Exception
     {
-    	final int PARTICIPANT_QTY = 10;
-    	final int PARTICIPANT_ID = 2;
-    	
-    	List<LeaderLatch> latches = Lists.newArrayList();
+        final int PARTICIPANT_QTY = 10;
+        final int PARTICIPANT_ID = 2;
+
+        List<LeaderLatch> latches = Lists.newArrayList();
 
         final Timing timing = new Timing();
         final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
         try
         {
-             client.start();
-
-             for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-             {
-                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-                 latch.start();
-                 latches.add(latch);
-             }
-
-             waitForALeader(latches, timing);
-             
-             //we need to close a Participant that doesn't be actual leader (first Participant)
nor the last
-             latches.get(PARTICIPANT_ID).close();
-             
-             //As the previous algorithm assumed that if the watched node is deleted gets
the leadership
-             //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected
as leader.
-             Assert.assertTrue(!latches.get(PARTICIPANT_ID-1).hasLeadership());
-	     }
-	     finally
-	     {
-	    	 //removes the already closed participant
-	    	 latches.remove(PARTICIPANT_ID);
-	    	 
-	         for ( LeaderLatch latch : latches )
-	         {
-	             Closeables.closeQuietly(latch);
-	         }
-	         Closeables.closeQuietly(client);
-	     }
+            client.start();
+
+            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+            {
+                LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
+                latch.start();
+                latches.add(latch);
+            }
+
+            waitForALeader(latches, timing);
+
+            //we need to close a Participant that doesn't be actual leader (first Participant)
nor the last
+            latches.get(PARTICIPANT_ID).close();
+
+            //As the previous algorithm assumed that if the watched node is deleted gets
the leadership
+            //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected
as leader.
+            Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
+        }
+        finally
+        {
+            //removes the already closed participant
+            latches.remove(PARTICIPANT_ID);
+
+            for ( LeaderLatch latch : latches )
+            {
+                Closeables.closeQuietly(latch);
+            }
+            Closeables.closeQuietly(client);
+        }
 
     }
-    
+
     @Test
     public void testWaiting() throws Exception
     {
@@ -244,6 +248,93 @@ public class TestLeaderLatch extends BaseClassForTests
         basic(Mode.START_IN_THREADS);
     }
 
+    @Test
+    public void testCallbackSanity() throws Exception
+    {
+        final int PARTICIPANT_QTY = 10;
+        final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
+        final AtomicLong masterCounter = new AtomicLong(0);
+        final AtomicLong dunceCounter = new AtomicLong(0);
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
+
+        List<LeaderLatch> latches = Lists.newArrayList();
+        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+        {
+            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
+            latch.addListener(
+                new LeaderLatchListener()
+                {
+                    boolean beenLeader = false;
+
+                    @Override
+                    public void isLeader()
+                    {
+                        if ( !beenLeader )
+                        {
+                            masterCounter.incrementAndGet();
+                            beenLeader = true;
+                            try
+                            {
+                                latch.reset();
+                            }
+                            catch ( Exception e )
+                            {
+                                throw Throwables.propagate(e);
+                            }
+                        }
+                        else
+                        {
+                            masterCounter.incrementAndGet();
+                            Closeables.closeQuietly(latch);
+                            timesSquare.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void notLeader()
+                    {
+                        dunceCounter.incrementAndGet();
+                    }
+                },
+                exec
+            );
+            latches.add(latch);
+        }
+
+        try
+        {
+            client.start();
+
+            for ( LeaderLatch latch : latches )
+            {
+                latch.start();
+            }
+
+            timesSquare.await();
+
+            Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
+            Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY);
+            for ( LeaderLatch latch : latches )
+            {
+                Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
+            }
+        }
+        finally
+        {
+            for ( LeaderLatch latch : latches )
+            {
+                if ( latch.getState() != LeaderLatch.State.CLOSED )
+                {
+                    Closeables.closeQuietly(latch);
+                }
+            }
+            Closeables.closeQuietly(client);
+        }
+    }
+
     private enum Mode
     {
         START_IMMEDIATELY,
@@ -277,18 +368,18 @@ public class TestLeaderLatch extends BaseClassForTests
                 for ( final LeaderLatch latch : latches )
                 {
                     service.submit
-                    (
-                        new Callable<Object>()
-                        {
-                            @Override
-                            public Object call() throws Exception
+                        (
+                            new Callable<Object>()
                             {
-                                Thread.sleep((int)(100 * Math.random()));
-                                latch.start();
-                                return null;
+                                @Override
+                                public Object call() throws Exception
+                                {
+                                    Thread.sleep((int)(100 * Math.random()));
+                                    latch.start();
+                                    return null;
+                                }
                             }
-                        }
-                    );
+                        );
                 }
                 service.shutdown();
             }


Mime
View raw message