Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8644110987 for ; Tue, 10 Sep 2013 22:59:48 +0000 (UTC) Received: (qmail 25940 invoked by uid 500); 10 Sep 2013 22:59:48 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 25912 invoked by uid 500); 10 Sep 2013 22:59:48 -0000 Mailing-List: contact commits-help@curator.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.incubator.apache.org Delivered-To: mailing list commits@curator.incubator.apache.org Received: (qmail 25905 invoked by uid 99); 10 Sep 2013 22:59:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Sep 2013 22:59:48 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 10 Sep 2013 22:59:46 +0000 Received: (qmail 22723 invoked by uid 99); 10 Sep 2013 22:59:26 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Sep 2013 22:59:26 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D90E78BD10E; Tue, 10 Sep 2013 22:59:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: It was not possible to correctly handle connection state issues with the LeaderSelectorListener. There was an edge case where if the connection was lost before takeLeadership() was called, the thread would not be known and the existing sample Date: Tue, 10 Sep 2013 22:59:25 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 69b824e5c -> 37cf6524c It was not possible to correctly handle connection state issues with the LeaderSelectorListener. There was an edge case where if the connection was lost before takeLeadership() was called, the thread would not be known and the existing sample code on how to handle stateChanged would not work. Introduced CancelLeadershipException to signal to the LeaderSelector instance that it should cancel the leadership. Also added a direct method of canceling leadership. Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/37cf6524 Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/37cf6524 Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/37cf6524 Branch: refs/heads/master Commit: 37cf6524cbe5315bbaf3e5d94c637529fe1f4c6e Parents: 69b824e Author: jordan.zimmerman Authored: Tue Sep 10 17:56:09 2013 -0500 Committer: jordan.zimmerman Committed: Tue Sep 10 17:56:09 2013 -0500 ---------------------------------------------------------------------- curator-examples/pom.xml | 2 - .../src/main/java/leader/ExampleClient.java | 10 +- .../leader/CancelLeadershipException.java | 54 +++++ .../recipes/leader/LeaderSelector.java | 91 +++++++-- .../site/confluence/leader-election.confluence | 9 +- .../recipes/leader/TestLeaderSelector.java | 196 +++++++++++++++---- 6 files changed, 294 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-examples/pom.xml ---------------------------------------------------------------------- diff --git a/curator-examples/pom.xml b/curator-examples/pom.xml index ddbd744..c4d060a 100644 --- a/curator-examples/pom.xml +++ b/curator-examples/pom.xml @@ -27,9 +27,7 @@ 2.2.1-incubating-SNAPSHOT - org.apache.curator curator-examples - 2.2.1-incubating-SNAPSHOT Curator Examples Example usages of various Curator features. http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-examples/src/main/java/leader/ExampleClient.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/leader/ExampleClient.java b/curator-examples/src/main/java/leader/ExampleClient.java index 8a70956..eebe5c0 100644 --- a/curator-examples/src/main/java/leader/ExampleClient.java +++ b/curator-examples/src/main/java/leader/ExampleClient.java @@ -19,6 +19,7 @@ package leader; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.CancelLeadershipException; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; @@ -36,8 +37,6 @@ public class ExampleClient implements Closeable, LeaderSelectorListener private final LeaderSelector leaderSelector; private final AtomicInteger leaderCount = new AtomicInteger(); - private volatile Thread ourThread = null; - public ExampleClient(CuratorFramework client, String path, String name) { this.name = name; @@ -71,7 +70,6 @@ public class ExampleClient implements Closeable, LeaderSelectorListener final int waitSeconds = (int)(5 * Math.random()) + 1; - ourThread = Thread.currentThread(); System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before."); try @@ -85,7 +83,6 @@ public class ExampleClient implements Closeable, LeaderSelectorListener } finally { - ourThread = null; System.out.println(name + " relinquishing leadership.\n"); } } @@ -97,10 +94,7 @@ public class ExampleClient implements Closeable, LeaderSelectorListener if ( (newState == ConnectionState.LOST) || (newState == ConnectionState.SUSPENDED) ) { - if ( ourThread != null ) - { - ourThread.interrupt(); - } + throw new CancelLeadershipException(); } } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/CancelLeadershipException.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/CancelLeadershipException.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/CancelLeadershipException.java new file mode 100644 index 0000000..dc6541d --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/CancelLeadershipException.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.leader; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; + +/** + * When thrown from {@link LeaderSelectorListener#stateChanged(CuratorFramework, ConnectionState)}, will + * cause {@link LeaderSelector#interruptLeadership()} to get called. IMPORTANT: this is only supported + * when thrown from {@link LeaderSelectorListener#stateChanged(CuratorFramework, ConnectionState)}. + */ +public class CancelLeadershipException extends RuntimeException +{ + public CancelLeadershipException() + { + } + + public CancelLeadershipException(String message) + { + super(message); + } + + public CancelLeadershipException(String message, Throwable cause) + { + super(message, cause); + } + + public CancelLeadershipException(Throwable cause) + { + super(cause); + } + + public CancelLeadershipException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) + { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java index c8247ca..ffbb0da 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; @@ -36,9 +37,11 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,10 +70,15 @@ public class LeaderSelector implements Closeable private final InterProcessMutex mutex; private final AtomicReference state = new AtomicReference(State.LATENT); private final AtomicBoolean autoRequeue = new AtomicBoolean(false); + private final AtomicReference> ourTask = new AtomicReference>(null); private volatile boolean hasLeadership; private volatile String id = ""; + @VisibleForTesting + volatile CountDownLatch debugLeadershipLatch = null; + volatile CountDownLatch debugLeadershipWaitLatch = null; + private enum State { LATENT, @@ -121,7 +129,7 @@ public class LeaderSelector implements Closeable Preconditions.checkNotNull(listener, "listener cannot be null"); this.client = client; - this.listener = listener; + this.listener = new WrappedListener(this, listener); hasLeadership = false; this.executorService = new CloseableExecutorService(executorService); @@ -212,18 +220,19 @@ public class LeaderSelector implements Closeable if ( !isQueued ) { isQueued = true; - executorService.submit - ( - new Callable() + Future task = executorService.submit + ( + new Callable() + { + @Override + public Void call() throws Exception { - @Override - public Object call() throws Exception - { - doWorkLoop(); - return null; - } + doWorkLoop(); + return null; } - ); + } + ); + ourTask.set(task); return true; } @@ -233,12 +242,13 @@ public class LeaderSelector implements Closeable /** * Shutdown this selector and remove yourself from the leadership group */ - public void close() + public synchronized void close() { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); client.getConnectionStateListenable().removeListener(listener); executorService.close(); + ourTask.set(null); } /** @@ -325,6 +335,18 @@ public class LeaderSelector implements Closeable return hasLeadership; } + /** + * Attempt to cancel and interrupt the current leadership if this instance has leadership + */ + public synchronized void interruptLeadership() + { + Future task = ourTask.get(); + if ( task != null ) + { + task.cancel(true); + } + } + private static Participant participantForPath(CuratorFramework client, String path, boolean markAsLeader) throws Exception { byte[] bytes = client.getData().forPath(path); @@ -343,6 +365,14 @@ public class LeaderSelector implements Closeable hasLeadership = true; try { + if ( debugLeadershipLatch != null ) + { + debugLeadershipLatch.countDown(); + } + if ( debugLeadershipWaitLatch != null ) + { + debugLeadershipWaitLatch.await(); + } listener.takeLeadership(client); } catch ( InterruptedException e ) @@ -400,7 +430,11 @@ public class LeaderSelector implements Closeable } catch ( InterruptedException ignore ) { - Thread.currentThread().interrupt(); + Future task = ourTask.get(); + if ( (task == null) || !task.isCancelled() ) // if interruptLeadership() was called, not re-set the interrupt state of the thread + { + Thread.currentThread().interrupt(); + } break; } if ( (exception != null) && !autoRequeue.get() ) // autoRequeue should ignore connection loss or session expired and just keep trying @@ -469,4 +503,35 @@ public class LeaderSelector implements Closeable } }; } + + private static class WrappedListener implements LeaderSelectorListener + { + private final LeaderSelector leaderSelector; + private final LeaderSelectorListener listener; + + public WrappedListener(LeaderSelector leaderSelector, LeaderSelectorListener listener) + { + this.leaderSelector = leaderSelector; + this.listener = listener; + } + + @Override + public void takeLeadership(CuratorFramework client) throws Exception + { + listener.takeLeadership(client); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + try + { + listener.stateChanged(client, newState); + } + catch ( CancelLeadershipException dummy ) + { + leaderSelector.interruptLeadership(); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-recipes/src/site/confluence/leader-election.confluence ---------------------------------------------------------------------- diff --git a/curator-recipes/src/site/confluence/leader-election.confluence b/curator-recipes/src/site/confluence/leader-election.confluence index ddc1636..15ecdcc 100644 --- a/curator-recipes/src/site/confluence/leader-election.confluence +++ b/curator-recipes/src/site/confluence/leader-election.confluence @@ -8,6 +8,7 @@ NOTE: Curator has two leader election recipes. Which one to use depends on your h2. Participating Classes * LeaderSelector * LeaderSelectorListener +* CancelLeadershipException h2. Usage h3. Creating a LeaderSelector @@ -49,6 +50,10 @@ leaderSelector.close(); {code} h2. Error Handling -The {{LeaderSelectorListener}} class extends {{ConnectionStateListener}}. When the LeaderSelector is started, it adds the listener to the Curator instance. Users of the {{LeaderSelector}} must pay attention to any connection state changes. If an instance becomes the leader, it should respond to notification of being SUSPENDED or LOST. +The {{LeaderSelectorListener}} class extends {{ConnectionStateListener}}. When the LeaderSelector is started, it adds the listener to the Curator instance. +Users of the {{LeaderSelector}} must pay attention to any connection state changes. If an instance becomes the leader, it should respond to notification of +being SUSPENDED or LOST. If the SUSPENDED state is reported, the instance must assume that it might no longer be the leader until it receives a RECONNECTED state. If the LOST +state is reported, the instance is no longer the leader and its {{takeLeadership}} method should exit. -If the SUSPENDED state is reported, the instance must assume that it might no longer be the leader until it receives a RECONNECTED state. If the LOST state is reported, the instance is no longer the leader and its {{takeLeadership}} method should exit. \ No newline at end of file +IMPORTANT: The recommended action for receiving SUSPENDED or LOST is to throw {{CancelLeadershipException}}. This will cause the LeaderSelector instance to attempt +to interrupt and cancel the thread that is executing the {{takeLeadership}} method. http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java index 9b9aac5..f0c703a 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes.leader; import com.google.common.collect.Lists; @@ -26,6 +27,7 @@ import org.apache.curator.framework.recipes.BaseClassForTests; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.KillSession; +import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.testng.Assert; import org.testng.annotations.Test; @@ -41,19 +43,130 @@ import java.util.concurrent.atomic.AtomicInteger; public class TestLeaderSelector extends BaseClassForTests { - private static final String PATH_NAME = "/one/two/me"; + private static final String PATH_NAME = "/one/two/me"; + + @Test + public void testInterruptLeadership() throws Exception + { + LeaderSelector selector = null; + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + try + { + client.start(); + + final CountDownLatch isLeaderLatch = new CountDownLatch(1); + final CountDownLatch losingLeaderLatch = new CountDownLatch(1); + LeaderSelectorListener listener = new LeaderSelectorListener() + { + @Override + public void takeLeadership(CuratorFramework client) throws Exception + { + isLeaderLatch.countDown(); + try + { + Thread.currentThread().join(); + } + finally + { + losingLeaderLatch.countDown(); + } + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + } + }; + + selector = new LeaderSelector(client, "/leader", listener); + selector.start(); + + Assert.assertTrue(timing.awaitLatch(isLeaderLatch)); + selector.interruptLeadership(); + Assert.assertTrue(timing.awaitLatch(losingLeaderLatch)); + } + finally + { + Closeables.closeQuietly(selector); + Closeables.closeQuietly(client); + } + } + + @Test + public void testRaceAtStateChanged() throws Exception + { + LeaderSelector selector = null; + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + try + { + client.start(); + + final CountDownLatch takeLeadershipLatch = new CountDownLatch(1); + final CountDownLatch lostLatch = new CountDownLatch(1); + final CountDownLatch reconnectedLatch = new CountDownLatch(1); + LeaderSelectorListener listener = new LeaderSelectorListener() + { + @Override + public void takeLeadership(CuratorFramework client) throws Exception + { + takeLeadershipLatch.countDown(); // should never get here + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState == ConnectionState.RECONNECTED ) + { + reconnectedLatch.countDown(); + } + else if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + throw new CancelLeadershipException(); + } + } + }; + + selector = new LeaderSelector(client, "/leader", listener); + CountDownLatch debugLeadershipLatch = new CountDownLatch(1); + CountDownLatch debugLeadershipWaitLatch = new CountDownLatch(1); + selector.debugLeadershipLatch = debugLeadershipLatch; + selector.debugLeadershipWaitLatch = debugLeadershipWaitLatch; + + selector.start(); + + Assert.assertTrue(timing.awaitLatch(debugLeadershipLatch)); + server.stop(); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + timing.sleepABit(); + debugLeadershipWaitLatch.countDown(); + + server = new TestingServer(server.getPort(), server.getTempDirectory()); + Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); + + Assert.assertFalse(takeLeadershipLatch.await(3, TimeUnit.SECONDS)); + } + finally + { + Closeables.closeQuietly(selector); + Closeables.closeQuietly(client); + } + } @Test - public void testAutoRequeue() throws Exception + public void testAutoRequeue() throws Exception { - LeaderSelector selector = null; - CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).sessionTimeoutMs(1000).build(); + Timing timing = new Timing(); + LeaderSelector selector = null; + CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).sessionTimeoutMs(timing.session()).build(); try { client.start(); - final Semaphore semaphore = new Semaphore(0); - LeaderSelectorListener listener = new LeaderSelectorListener() + final Semaphore semaphore = new Semaphore(0); + LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception @@ -70,8 +183,8 @@ public class TestLeaderSelector extends BaseClassForTests selector = new LeaderSelector(client, "/leader", listener); selector.autoRequeue(); selector.start(); - - Assert.assertTrue(semaphore.tryAcquire(2, 10, TimeUnit.SECONDS)); + + Assert.assertTrue(timing.acquireSemaphore(semaphore, 2)); } finally { @@ -79,18 +192,18 @@ public class TestLeaderSelector extends BaseClassForTests Closeables.closeQuietly(client); } } - + @Test - public void testServerDying() throws Exception + public void testServerDying() throws Exception { - Timing timing = new Timing(); - LeaderSelector selector = null; - CuratorFramework client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).sessionTimeoutMs(timing.session()).build(); + Timing timing = new Timing(); + LeaderSelector selector = null; + CuratorFramework client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).sessionTimeoutMs(timing.session()).build(); client.start(); try { - final Semaphore semaphore = new Semaphore(0); - LeaderSelectorListener listener = new LeaderSelectorListener() + final Semaphore semaphore = new Semaphore(0); + LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception @@ -125,28 +238,25 @@ public class TestLeaderSelector extends BaseClassForTests } @Test - public void testKillSession() throws Exception + public void testKillSession() throws Exception { - final Timing timing = new Timing(); + final Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); client.start(); try { - final Semaphore semaphore = new Semaphore(0); - final CountDownLatch interruptedLatch = new CountDownLatch(1); - final AtomicInteger leaderCount = new AtomicInteger(0); - LeaderSelectorListener listener = new LeaderSelectorListener() + final Semaphore semaphore = new Semaphore(0); + final CountDownLatch interruptedLatch = new CountDownLatch(1); + final AtomicInteger leaderCount = new AtomicInteger(0); + LeaderSelectorListener listener = new LeaderSelectorListener() { - private volatile Thread ourThread; - @Override public void takeLeadership(CuratorFramework client) throws Exception { leaderCount.incrementAndGet(); try { - ourThread = Thread.currentThread(); semaphore.release(); try { @@ -167,9 +277,9 @@ public class TestLeaderSelector extends BaseClassForTests @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - if ( (newState == ConnectionState.LOST) && (ourThread != null) ) + if ( newState == ConnectionState.LOST ) { - ourThread.interrupt(); + throw new CancelLeadershipException(); } } }; @@ -202,13 +312,13 @@ public class TestLeaderSelector extends BaseClassForTests } @Test - public void testClosing() throws Exception + public void testClosing() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); client.start(); try { - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); LeaderSelector leaderSelector1 = new LeaderSelector(client, PATH_NAME, new LeaderSelectorListener() { @Override @@ -223,7 +333,7 @@ public class TestLeaderSelector extends BaseClassForTests } }); - LeaderSelector leaderSelector2 = new LeaderSelector(client, PATH_NAME, new LeaderSelectorListener() + LeaderSelector leaderSelector2 = new LeaderSelector(client, PATH_NAME, new LeaderSelectorListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) @@ -247,8 +357,8 @@ public class TestLeaderSelector extends BaseClassForTests Assert.assertNotSame(leaderSelector1.hasLeadership(), leaderSelector2.hasLeadership()); - LeaderSelector positiveLeader; - LeaderSelector negativeLeader; + LeaderSelector positiveLeader; + LeaderSelector negativeLeader; if ( leaderSelector1.hasLeadership() ) { positiveLeader = leaderSelector1; @@ -277,22 +387,22 @@ public class TestLeaderSelector extends BaseClassForTests @SuppressWarnings({"ForLoopReplaceableByForEach"}) @Test - public void testRotatingLeadership() throws Exception + public void testRotatingLeadership() throws Exception { - final int LEADER_QTY = 5; - final int REPEAT_QTY = 3; + final int LEADER_QTY = 5; + final int REPEAT_QTY = 3; - final Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + final Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); client.start(); try { - final BlockingQueue leaderList = new LinkedBlockingQueue(); - List selectors = Lists.newArrayList(); + final BlockingQueue leaderList = new LinkedBlockingQueue(); + List selectors = Lists.newArrayList(); for ( int i = 0; i < LEADER_QTY; ++i ) { - final int ourIndex = i; - LeaderSelector leaderSelector = new LeaderSelector(client, PATH_NAME, new LeaderSelectorListener() + final int ourIndex = i; + LeaderSelector leaderSelector = new LeaderSelector(client, PATH_NAME, new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception @@ -309,7 +419,7 @@ public class TestLeaderSelector extends BaseClassForTests selectors.add(leaderSelector); } - List localLeaderList = Lists.newArrayList(); + List localLeaderList = Lists.newArrayList(); for ( int i = 1; i <= REPEAT_QTY; ++i ) { for ( LeaderSelector leaderSelector : selectors ) @@ -341,12 +451,12 @@ public class TestLeaderSelector extends BaseClassForTests for ( int i = 0; i < REPEAT_QTY; ++i ) { - Set uniques = Sets.newHashSet(); + Set uniques = Sets.newHashSet(); for ( int j = 0; j < selectors.size(); ++j ) { Assert.assertTrue(localLeaderList.size() > 0); - int thisIndex = localLeaderList.remove(0); + int thisIndex = localLeaderList.remove(0); Assert.assertFalse(uniques.contains(thisIndex)); uniques.add(thisIndex); }