Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D902E11975 for ; Thu, 9 May 2013 23:48:07 +0000 (UTC) Received: (qmail 7245 invoked by uid 500); 9 May 2013 23:48:07 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 7221 invoked by uid 500); 9 May 2013 23:48:07 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 7214 invoked by uid 99); 9 May 2013 23:48:07 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 May 2013 23:48:07 +0000 X-ASF-Spam-Status: No, hits=-2001.2 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 09 May 2013 23:48:02 +0000 Received: (qmail 5496 invoked by uid 99); 9 May 2013 23:47:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 May 2013 23:47:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BF2D488AD71; Thu, 9 May 2013 23:47:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.incubator.apache.org Date: Thu, 09 May 2013 23:47:42 -0000 Message-Id: <902b822da3934371915dca806cfd90c1@git.apache.org> In-Reply-To: <3f37987360b1420381b3cfefa3fbb95d@git.apache.org> References: <3f37987360b1420381b3cfefa3fbb95d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] git commit: CURATOR-22 Add a listener to LeaderLatch X-Virus-Checked: Checked by ClamAV on apache.org CURATOR-22 Add a listener to LeaderLatch Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/3d6181ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/3d6181ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/3d6181ca Branch: refs/heads/2.0.1-incubating Commit: 3d6181cae984807bd2f89cecc2e5b55d0574a5b3 Parents: 9acf592 Author: randgalt Authored: Thu May 9 16:46:50 2013 -0700 Committer: randgalt Committed: Thu May 9 16:46:50 2013 -0700 ---------------------------------------------------------------------- .../framework/recipes/leader/LeaderLatch.java | 244 ++++++++++----- .../recipes/leader/LeaderLatchListener.java | 46 +++ .../framework/recipes/leader/TestLeaderLatch.java | 181 ++++++++--- 3 files changed, 351 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 36a0636..508ca7c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -16,13 +16,16 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes.leader; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.recipes.locks.LockInternals; import org.apache.curator.framework.recipes.locks.LockInternalsSorter; import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver; @@ -41,29 +44,31 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** *

- * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to - * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will - * randomly be assigned leader until it releases leadership at which time another one from the - * group will randomly be chosen + * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to + * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will + * randomly be assigned leader until it releases leadership at which time another one from the + * group will randomly be chosen *

*/ public class LeaderLatch implements Closeable { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final CuratorFramework client; - private final String latchPath; - private final String id; - private final AtomicReference state = new AtomicReference(State.LATENT); - private final AtomicBoolean hasLeadership = new AtomicBoolean(false); - private final AtomicReference ourPath = new AtomicReference(); - - private final ConnectionStateListener listener = new ConnectionStateListener() + private final Logger log = LoggerFactory.getLogger(getClass()); + private final CuratorFramework client; + private final String latchPath; + private final String id; + private final AtomicReference state = new AtomicReference(State.LATENT); + private final AtomicBoolean hasLeadership = new AtomicBoolean(false); + private final AtomicReference ourPath = new AtomicReference(); + private final ListenerContainer listeners = new ListenerContainer(); + + private final ConnectionStateListener listener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) @@ -74,7 +79,7 @@ public class LeaderLatch implements Closeable private static final String LOCK_NAME = "latch-"; - private static final LockInternalsSorter sorter = new LockInternalsSorter() + private static final LockInternalsSorter sorter = new LockInternalsSorter() { @Override public String fixForSorting(String str, String lockName) @@ -83,7 +88,7 @@ public class LeaderLatch implements Closeable } }; - private enum State + public enum State { LATENT, STARTED, @@ -91,7 +96,7 @@ public class LeaderLatch implements Closeable } /** - * @param client the client + * @param client the client * @param latchPath the path for this leadership group */ public LeaderLatch(CuratorFramework client, String latchPath) @@ -100,9 +105,9 @@ public class LeaderLatch implements Closeable } /** - * @param client the client + * @param client the client * @param latchPath the path for this leadership group - * @param id participant ID + * @param id participant ID */ public LeaderLatch(CuratorFramework client, String latchPath, String id) { @@ -147,16 +152,60 @@ public class LeaderLatch implements Closeable finally { client.getConnectionStateListenable().removeListener(listener); + listeners.clear(); setLeadership(false); } } /** + * Attaches a listener to this LeaderLatch + *

+ * Attaching the same listener multiple times is a noop from the second time on. + *

+ * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded + * executor so that you can be certain that listener methods are called in sequence, but if you are fine with + * them being called out of order you are welcome to use multiple threads. + * + * @param listener the listener to attach + */ + public void addListener(LeaderLatchListener listener) + { + listeners.addListener(listener); + } + + /** + * Attaches a listener to this LeaderLatch + *

+ * Attaching the same listener multiple times is a noop from the second time on. + *

+ * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded + * executor so that you can be certain that listener methods are called in sequence, but if you are fine with + * them being called out of order you are welcome to use multiple threads. + * + * @param listener the listener to attach + * @param executor An executor to run the methods for the listener on. + */ + public void addListener(LeaderLatchListener listener, Executor executor) + { + listeners.addListener(listener, executor); + } + + /** + * Removes a given listener from this LeaderLatch + * + * @param listener the listener to remove + */ + public void removeListener(LeaderLatchListener listener) + { + listeners.removeListener(listener); + } + + /** *

Causes the current thread to wait until this instance acquires leadership * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.

- * + *

*

If this instance already is the leader then this method returns immediately.

- * + *

*

Otherwise the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of three things happen: @@ -166,7 +215,7 @@ public class LeaderLatch implements Closeable * the current thread *

  • The instance is {@linkplain #close() closed}
  • *

    - * + *

    *

    If the current thread: *

      *
    • has its interrupted status set on entry to this method; or @@ -176,9 +225,9 @@ public class LeaderLatch implements Closeable * interrupted status is cleared.

      * * @throws InterruptedException if the current thread is interrupted - * while waiting - * @throws EOFException if the instance is {@linkplain #close() closed} - * while waiting + * while waiting + * @throws EOFException if the instance is {@linkplain #close() closed} + * while waiting */ public void await() throws InterruptedException, EOFException { @@ -199,10 +248,10 @@ public class LeaderLatch implements Closeable *

      Causes the current thread to wait until this instance acquires leadership * unless the thread is {@linkplain Thread#interrupt interrupted}, * the specified waiting time elapses or the instance is {@linkplain #close() closed}.

      - * + *

      *

      If this instance already is the leader then this method returns immediately * with the value {@code true}.

      - * + *

      *

      Otherwise the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of four things happen: @@ -213,7 +262,7 @@ public class LeaderLatch implements Closeable *

    • The specified waiting time elapses.
    • *
    • The instance is {@linkplain #close() closed}
    • *
    - * + *

    *

    If the current thread: *

      *
    • has its interrupted status set on entry to this method; or @@ -221,29 +270,29 @@ public class LeaderLatch implements Closeable *
    * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared.

    - * + *

    *

    If the specified waiting time elapses or the instance is {@linkplain #close() closed} * then the value {@code false} is returned. If the time is less than or equal to zero, the method * will not wait at all.

    * * @param timeout the maximum time to wait - * @param unit the time unit of the {@code timeout} argument + * @param unit the time unit of the {@code timeout} argument * @return {@code true} if the count reached zero and {@code false} * if the waiting time elapsed before the count reached zero or the instances was closed * @throws InterruptedException if the current thread is interrupted - * while waiting + * while waiting */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit); + long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit); synchronized(this) { while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() ) { - long startNanos = System.nanoTime(); + long startNanos = System.nanoTime(); TimeUnit.NANOSECONDS.timedWait(this, waitNanos); - long elapsed = System.nanoTime() - startNanos; + long elapsed = System.nanoTime() - startNanos; waitNanos -= elapsed; } } @@ -261,14 +310,27 @@ public class LeaderLatch implements Closeable } /** + * Returns this instances current state, this is the only way to verify that the object has been closed before + * closing again. If you try to close a latch multiple times, the close() method will throw an + * IllegalArgumentException which is often not caught and ignored (Closeables.closeQuietly() only looks for + * IOException). + * + * @return the state of the current instance + */ + public State getState() + { + return state.get(); + } + + /** *

    - * Returns the set of current participants in the leader selection + * Returns the set of current participants in the leader selection *

    - * + *

    *

    - * NOTE - this method polls the ZK server. Therefore it can possibly - * return a value that does not match {@link #hasLeadership()} as hasLeadership - * uses a local field of the class. + * NOTE - this method polls the ZK server. Therefore it can possibly + * return a value that does not match {@link #hasLeadership()} as hasLeadership + * uses a local field of the class. *

    * * @return participants @@ -282,20 +344,20 @@ public class LeaderLatch implements Closeable /** *

    - * Return the id for the current leader. If for some reason there is no - * current leader, a dummy participant is returned. + * Return the id for the current leader. If for some reason there is no + * current leader, a dummy participant is returned. *

    - * + *

    *

    - * NOTE - this method polls the ZK server. Therefore it can possibly - * return a value that does not match {@link #hasLeadership()} as hasLeadership - * uses a local field of the class. + * NOTE - this method polls the ZK server. Therefore it can possibly + * return a value that does not match {@link #hasLeadership()} as hasLeadership + * uses a local field of the class. *

    * * @return leader * @throws Exception ZK errors, interruptions, etc. */ - public Participant getLeader() throws Exception + public Participant getLeader() throws Exception { Collection participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter); return LeaderSelector.getLeader(client, participantNodes); @@ -320,7 +382,7 @@ public class LeaderLatch implements Closeable setLeadership(false); setNode(null); - BackgroundCallback callback = new BackgroundCallback() + BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception @@ -347,9 +409,9 @@ public class LeaderLatch implements Closeable private void checkLeadership(List children) throws Exception { - final String localOurPath = ourPath.get(); - List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); - int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; + final String localOurPath = ourPath.get(); + List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); + int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; if ( ourIndex < 0 ) { log.error("Can't find our node. Resetting. Index: " + ourIndex); @@ -361,7 +423,7 @@ public class LeaderLatch implements Closeable } else { - String watchPath = sortedChildren.get(ourIndex - 1); + String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @Override @@ -373,7 +435,7 @@ public class LeaderLatch implements Closeable { getChildren(); } - catch(Exception ex) + catch ( Exception ex ) { log.error("An error occurred checking the leadership.", ex); } @@ -381,7 +443,7 @@ public class LeaderLatch implements Closeable } }; - BackgroundCallback callback = new BackgroundCallback() + BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception @@ -399,7 +461,7 @@ public class LeaderLatch implements Closeable private void getChildren() throws Exception { - BackgroundCallback callback = new BackgroundCallback() + BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception @@ -417,44 +479,76 @@ public class LeaderLatch implements Closeable { switch ( newState ) { - default: - { - // NOP - break; - } + default: + { + // NOP + break; + } - case RECONNECTED: + case RECONNECTED: + { + try { - try - { - reset(); - } - catch ( Exception e ) - { - log.error("Could not reset leader latch", e); - setLeadership(false); - } - break; + reset(); } - - case SUSPENDED: - case LOST: + catch ( Exception e ) { + log.error("Could not reset leader latch", e); setLeadership(false); - break; } + break; + } + + case SUSPENDED: + case LOST: + { + setLeadership(false); + break; + } } } private synchronized void setLeadership(boolean newValue) { - hasLeadership.set(newValue); + boolean oldValue = hasLeadership.getAndSet(newValue); + + if ( oldValue && !newValue ) + { // Lost leadership, was true, now false + listeners.forEach + ( + new Function() + { + @Override + public Void apply(LeaderLatchListener listener) + { + listener.notLeader(); + return null; + } + } + ); + } + else if ( !oldValue && newValue ) + { // Gained leadership, was false, now true + listeners.forEach + ( + new Function() + { + @Override + public Void apply(LeaderLatchListener input) + { + input.isLeader(); + return null; + } + } + ); + } + notifyAll(); } private void setNode(String newValue) throws Exception { - String oldPath = ourPath.getAndSet(newValue); + String oldPath = ourPath.getAndSet(newValue); if ( oldPath != null ) { client.delete().guaranteed().inBackground().forPath(oldPath); http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java new file mode 100644 index 0000000..68dd355 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.leader; + +/** + * A LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed. + * + * Note that just because you are in the middle of one of these method calls, it does not necessarily mean that + * hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes + * before these methods get called. The contract is that if that happens, you should see another call to the other + * method pretty quickly. + */ +public interface LeaderLatchListener +{ + /** + * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true. + * + * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If + * this occurs, you can expect {@link #notLeader()} to also be called. + */ + public void isLeader(); + + /** + * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false. + * + * Note that it is possible that by the time this method call happens, hasLeadership has become true. If + * this occurs, you can expect {@link #isLeader()} to also be called. + */ + public void notLeader(); +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 307c383..58a61ee 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -16,10 +16,13 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes.leader; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.io.Closeables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.BaseClassForTests; @@ -39,6 +42,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class TestLeaderLatch extends BaseClassForTests { @@ -88,7 +92,7 @@ public class TestLeaderLatch extends BaseClassForTests { client.start(); - final CountDownLatch countDownLatch = new CountDownLatch(1); + final CountDownLatch countDownLatch = new CountDownLatch(1); client.getConnectionStateListenable().addListener ( new ConnectionStateListener() @@ -136,47 +140,47 @@ public class TestLeaderLatch extends BaseClassForTests @Test public void testCorrectWatching() throws Exception { - final int PARTICIPANT_QTY = 10; - final int PARTICIPANT_ID = 2; - - List latches = Lists.newArrayList(); + final int PARTICIPANT_QTY = 10; + final int PARTICIPANT_ID = 2; + + List latches = Lists.newArrayList(); final Timing timing = new Timing(); final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); try { - client.start(); - - for ( int i = 0; i < PARTICIPANT_QTY; ++i ) - { - LeaderLatch latch = new LeaderLatch(client, PATH_NAME); - latch.start(); - latches.add(latch); - } - - waitForALeader(latches, timing); - - //we need to close a Participant that doesn't be actual leader (first Participant) nor the last - latches.get(PARTICIPANT_ID).close(); - - //As the previous algorithm assumed that if the watched node is deleted gets the leadership - //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader. - Assert.assertTrue(!latches.get(PARTICIPANT_ID-1).hasLeadership()); - } - finally - { - //removes the already closed participant - latches.remove(PARTICIPANT_ID); - - for ( LeaderLatch latch : latches ) - { - Closeables.closeQuietly(latch); - } - Closeables.closeQuietly(client); - } + client.start(); + + for ( int i = 0; i < PARTICIPANT_QTY; ++i ) + { + LeaderLatch latch = new LeaderLatch(client, PATH_NAME); + latch.start(); + latches.add(latch); + } + + waitForALeader(latches, timing); + + //we need to close a Participant that doesn't be actual leader (first Participant) nor the last + latches.get(PARTICIPANT_ID).close(); + + //As the previous algorithm assumed that if the watched node is deleted gets the leadership + //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader. + Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership()); + } + finally + { + //removes the already closed participant + latches.remove(PARTICIPANT_ID); + + for ( LeaderLatch latch : latches ) + { + Closeables.closeQuietly(latch); + } + Closeables.closeQuietly(client); + } } - + @Test public void testWaiting() throws Exception { @@ -244,6 +248,93 @@ public class TestLeaderLatch extends BaseClassForTests basic(Mode.START_IN_THREADS); } + @Test + public void testCallbackSanity() throws Exception + { + final int PARTICIPANT_QTY = 10; + final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY); + final AtomicLong masterCounter = new AtomicLong(0); + final AtomicLong dunceCounter = new AtomicLong(0); + + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build()); + + List latches = Lists.newArrayList(); + for ( int i = 0; i < PARTICIPANT_QTY; ++i ) + { + final LeaderLatch latch = new LeaderLatch(client, PATH_NAME); + latch.addListener( + new LeaderLatchListener() + { + boolean beenLeader = false; + + @Override + public void isLeader() + { + if ( !beenLeader ) + { + masterCounter.incrementAndGet(); + beenLeader = true; + try + { + latch.reset(); + } + catch ( Exception e ) + { + throw Throwables.propagate(e); + } + } + else + { + masterCounter.incrementAndGet(); + Closeables.closeQuietly(latch); + timesSquare.countDown(); + } + } + + @Override + public void notLeader() + { + dunceCounter.incrementAndGet(); + } + }, + exec + ); + latches.add(latch); + } + + try + { + client.start(); + + for ( LeaderLatch latch : latches ) + { + latch.start(); + } + + timesSquare.await(); + + Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2); + Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY); + for ( LeaderLatch latch : latches ) + { + Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED); + } + } + finally + { + for ( LeaderLatch latch : latches ) + { + if ( latch.getState() != LeaderLatch.State.CLOSED ) + { + Closeables.closeQuietly(latch); + } + } + Closeables.closeQuietly(client); + } + } + private enum Mode { START_IMMEDIATELY, @@ -277,18 +368,18 @@ public class TestLeaderLatch extends BaseClassForTests for ( final LeaderLatch latch : latches ) { service.submit - ( - new Callable() - { - @Override - public Object call() throws Exception + ( + new Callable() { - Thread.sleep((int)(100 * Math.random())); - latch.start(); - return null; + @Override + public Object call() throws Exception + { + Thread.sleep((int)(100 * Math.random())); + latch.start(); + return null; + } } - } - ); + ); } service.shutdown(); }