curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject git commit: It was not possible to correctly handle connection state issues with the LeaderSelectorListener. There was an edge case where if the connection was lost before takeLeadership() was called, the thread would not be known and the existing sample
Date Tue, 10 Sep 2013 22:59:25 GMT
Updated Branches:
  refs/heads/master 69b824e5c -> 37cf6524c


It was not possible to correctly handle connection state issues with the LeaderSelectorListener.
There was an edge case where if
the connection was lost before takeLeadership() was called, the thread would not be known
and the existing sample code on
how to handle stateChanged would not work.

Introduced CancelLeadershipException to signal to the LeaderSelector instance that it should
cancel the leadership. Also
added a direct method of canceling leadership.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/37cf6524
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/37cf6524
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/37cf6524

Branch: refs/heads/master
Commit: 37cf6524cbe5315bbaf3e5d94c637529fe1f4c6e
Parents: 69b824e
Author: jordan.zimmerman <jordan.zimmerman@riotgames.com>
Authored: Tue Sep 10 17:56:09 2013 -0500
Committer: jordan.zimmerman <jordan.zimmerman@riotgames.com>
Committed: Tue Sep 10 17:56:09 2013 -0500

----------------------------------------------------------------------
 curator-examples/pom.xml                        |   2 -
 .../src/main/java/leader/ExampleClient.java     |  10 +-
 .../leader/CancelLeadershipException.java       |  54 +++++
 .../recipes/leader/LeaderSelector.java          |  91 +++++++--
 .../site/confluence/leader-election.confluence  |   9 +-
 .../recipes/leader/TestLeaderSelector.java      | 196 +++++++++++++++----
 6 files changed, 294 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-examples/pom.xml
----------------------------------------------------------------------
diff --git a/curator-examples/pom.xml b/curator-examples/pom.xml
index ddbd744..c4d060a 100644
--- a/curator-examples/pom.xml
+++ b/curator-examples/pom.xml
@@ -27,9 +27,7 @@
         <version>2.2.1-incubating-SNAPSHOT</version>
     </parent>
 
-    <groupId>org.apache.curator</groupId>
     <artifactId>curator-examples</artifactId>
-    <version>2.2.1-incubating-SNAPSHOT</version>
 
     <name>Curator Examples</name>
     <description>Example usages of various Curator features.</description>

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-examples/src/main/java/leader/ExampleClient.java
----------------------------------------------------------------------
diff --git a/curator-examples/src/main/java/leader/ExampleClient.java b/curator-examples/src/main/java/leader/ExampleClient.java
index 8a70956..eebe5c0 100644
--- a/curator-examples/src/main/java/leader/ExampleClient.java
+++ b/curator-examples/src/main/java/leader/ExampleClient.java
@@ -19,6 +19,7 @@
 package leader;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.CancelLeadershipException;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import org.apache.curator.framework.state.ConnectionState;
@@ -36,8 +37,6 @@ public class ExampleClient implements Closeable, LeaderSelectorListener
     private final LeaderSelector leaderSelector;
     private final AtomicInteger leaderCount = new AtomicInteger();
 
-    private volatile Thread     ourThread = null;
-
     public ExampleClient(CuratorFramework client, String path, String name)
     {
         this.name = name;
@@ -71,7 +70,6 @@ public class ExampleClient implements Closeable, LeaderSelectorListener
 
         final int         waitSeconds = (int)(5 * Math.random()) + 1;
 
-        ourThread = Thread.currentThread();
         System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
         System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + "
time(s) before.");
         try
@@ -85,7 +83,6 @@ public class ExampleClient implements Closeable, LeaderSelectorListener
         }
         finally
         {
-            ourThread = null;
             System.out.println(name + " relinquishing leadership.\n");
         }
     }
@@ -97,10 +94,7 @@ public class ExampleClient implements Closeable, LeaderSelectorListener
 
         if ( (newState == ConnectionState.LOST) || (newState == ConnectionState.SUSPENDED)
)
         {
-            if ( ourThread != null )
-            {
-                ourThread.interrupt();
-            }
+            throw new CancelLeadershipException();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/CancelLeadershipException.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/CancelLeadershipException.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/CancelLeadershipException.java
new file mode 100644
index 0000000..dc6541d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/CancelLeadershipException.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+
+/**
+ * When thrown from {@link LeaderSelectorListener#stateChanged(CuratorFramework, ConnectionState)},
will
+ * cause {@link LeaderSelector#interruptLeadership()} to get called. IMPORTANT: this is only
supported
+ * when thrown from {@link LeaderSelectorListener#stateChanged(CuratorFramework, ConnectionState)}.
+ */
+public class CancelLeadershipException extends RuntimeException
+{
+    public CancelLeadershipException()
+    {
+    }
+
+    public CancelLeadershipException(String message)
+    {
+        super(message);
+    }
+
+    public CancelLeadershipException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    public CancelLeadershipException(Throwable cause)
+    {
+        super(cause);
+    }
+
+    public CancelLeadershipException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace)
+    {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index c8247ca..ffbb0da 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
@@ -36,9 +37,11 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -67,10 +70,15 @@ public class LeaderSelector implements Closeable
     private final InterProcessMutex mutex;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final AtomicBoolean autoRequeue = new AtomicBoolean(false);
+    private final AtomicReference<Future<?>> ourTask = new AtomicReference<Future<?>>(null);
 
     private volatile boolean hasLeadership;
     private volatile String id = "";
 
+    @VisibleForTesting
+    volatile CountDownLatch debugLeadershipLatch = null;
+    volatile CountDownLatch debugLeadershipWaitLatch = null;
+
     private enum State
     {
         LATENT,
@@ -121,7 +129,7 @@ public class LeaderSelector implements Closeable
         Preconditions.checkNotNull(listener, "listener cannot be null");
 
         this.client = client;
-        this.listener = listener;
+        this.listener = new WrappedListener(this, listener);
         hasLeadership = false;
 
         this.executorService = new CloseableExecutorService(executorService);
@@ -212,18 +220,19 @@ public class LeaderSelector implements Closeable
         if ( !isQueued )
         {
             isQueued = true;
-            executorService.submit
-                (
-                    new Callable<Object>()
+            Future<Void> task = executorService.submit
+            (
+                new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
                     {
-                        @Override
-                        public Object call() throws Exception
-                        {
-                            doWorkLoop();
-                            return null;
-                        }
+                        doWorkLoop();
+                        return null;
                     }
-                );
+                }
+            );
+            ourTask.set(task);
 
             return true;
         }
@@ -233,12 +242,13 @@ public class LeaderSelector implements Closeable
     /**
      * Shutdown this selector and remove yourself from the leadership group
      */
-    public void close()
+    public synchronized void close()
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already
closed or has not been started");
 
         client.getConnectionStateListenable().removeListener(listener);
         executorService.close();
+        ourTask.set(null);
     }
 
     /**
@@ -325,6 +335,18 @@ public class LeaderSelector implements Closeable
         return hasLeadership;
     }
 
+    /**
+     * Attempt to cancel and interrupt the current leadership if this instance has leadership
+     */
+    public synchronized void interruptLeadership()
+    {
+        Future<?> task = ourTask.get();
+        if ( task != null )
+        {
+            task.cancel(true);
+        }
+    }
+
     private static Participant participantForPath(CuratorFramework client, String path, boolean
markAsLeader) throws Exception
     {
         byte[] bytes = client.getData().forPath(path);
@@ -343,6 +365,14 @@ public class LeaderSelector implements Closeable
             hasLeadership = true;
             try
             {
+                if ( debugLeadershipLatch != null )
+                {
+                    debugLeadershipLatch.countDown();
+                }
+                if ( debugLeadershipWaitLatch != null )
+                {
+                    debugLeadershipWaitLatch.await();
+                }
                 listener.takeLeadership(client);
             }
             catch ( InterruptedException e )
@@ -400,7 +430,11 @@ public class LeaderSelector implements Closeable
             }
             catch ( InterruptedException ignore )
             {
-                Thread.currentThread().interrupt();
+                Future<?> task = ourTask.get();
+                if ( (task == null) || !task.isCancelled() )    // if interruptLeadership()
was called, not re-set the interrupt state of the thread
+                {
+                    Thread.currentThread().interrupt();
+                }
                 break;
             }
             if ( (exception != null) && !autoRequeue.get() )   // autoRequeue should
ignore connection loss or session expired and just keep trying
@@ -469,4 +503,35 @@ public class LeaderSelector implements Closeable
             }
         };
     }
+
+    private static class WrappedListener implements LeaderSelectorListener
+    {
+        private final LeaderSelector leaderSelector;
+        private final LeaderSelectorListener listener;
+
+        public WrappedListener(LeaderSelector leaderSelector, LeaderSelectorListener listener)
+        {
+            this.leaderSelector = leaderSelector;
+            this.listener = listener;
+        }
+
+        @Override
+        public void takeLeadership(CuratorFramework client) throws Exception
+        {
+            listener.takeLeadership(client);
+        }
+
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            try
+            {
+                listener.stateChanged(client, newState);
+            }
+            catch ( CancelLeadershipException dummy )
+            {
+                leaderSelector.interruptLeadership();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-recipes/src/site/confluence/leader-election.confluence
----------------------------------------------------------------------
diff --git a/curator-recipes/src/site/confluence/leader-election.confluence b/curator-recipes/src/site/confluence/leader-election.confluence
index ddc1636..15ecdcc 100644
--- a/curator-recipes/src/site/confluence/leader-election.confluence
+++ b/curator-recipes/src/site/confluence/leader-election.confluence
@@ -8,6 +8,7 @@ NOTE: Curator has two leader election recipes. Which one to use depends on
your
 h2. Participating Classes
 * LeaderSelector
 * LeaderSelectorListener
+* CancelLeadershipException
 
 h2. Usage
 h3. Creating a LeaderSelector
@@ -49,6 +50,10 @@ leaderSelector.close();
 {code}
 
 h2. Error Handling
-The {{LeaderSelectorListener}} class extends {{ConnectionStateListener}}. When the LeaderSelector
is started, it adds the listener to the Curator instance. Users of the {{LeaderSelector}}
must pay attention to any connection state changes. If an instance becomes the leader, it
should respond to notification of being SUSPENDED or LOST.
+The {{LeaderSelectorListener}} class extends {{ConnectionStateListener}}. When the LeaderSelector
is started, it adds the listener to the Curator instance.
+Users of the {{LeaderSelector}} must pay attention to any connection state changes. If an
instance becomes the leader, it should respond to notification of
+being SUSPENDED or LOST. If the SUSPENDED state is reported, the instance must assume that
it might no longer be the leader until it receives a RECONNECTED state. If the LOST
+state is reported, the instance is no longer the leader and its {{takeLeadership}} method
should exit.
 
-If the SUSPENDED state is reported, the instance must assume that it might no longer be the
leader until it receives a RECONNECTED state. If the LOST state is reported, the instance
is no longer the leader and its {{takeLeadership}} method should exit.
\ No newline at end of file
+IMPORTANT: The recommended action for receiving SUSPENDED or LOST is to throw {{CancelLeadershipException}}.
This will cause the LeaderSelector instance to attempt
+to interrupt and cancel the thread that is executing the {{takeLeadership}} method.

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/37cf6524/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index 9b9aac5..f0c703a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.collect.Lists;
@@ -26,6 +27,7 @@ import org.apache.curator.framework.recipes.BaseClassForTests;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -41,19 +43,130 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestLeaderSelector extends BaseClassForTests
 {
-    private static final String     PATH_NAME = "/one/two/me";
+    private static final String PATH_NAME = "/one/two/me";
+
+    @Test
+    public void testInterruptLeadership() throws Exception
+    {
+        LeaderSelector selector = null;
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CountDownLatch isLeaderLatch = new CountDownLatch(1);
+            final CountDownLatch losingLeaderLatch = new CountDownLatch(1);
+            LeaderSelectorListener listener = new LeaderSelectorListener()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    isLeaderLatch.countDown();
+                    try
+                    {
+                        Thread.currentThread().join();
+                    }
+                    finally
+                    {
+                        losingLeaderLatch.countDown();
+                    }
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                }
+            };
+
+            selector = new LeaderSelector(client, "/leader", listener);
+            selector.start();
+
+            Assert.assertTrue(timing.awaitLatch(isLeaderLatch));
+            selector.interruptLeadership();
+            Assert.assertTrue(timing.awaitLatch(losingLeaderLatch));
+        }
+        finally
+        {
+            Closeables.closeQuietly(selector);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testRaceAtStateChanged() throws Exception
+    {
+        LeaderSelector selector = null;
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CountDownLatch takeLeadershipLatch = new CountDownLatch(1);
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            LeaderSelectorListener listener = new LeaderSelectorListener()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    takeLeadershipLatch.countDown();  // should never get here
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.RECONNECTED )
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                    else if ( newState == ConnectionState.LOST )
+                    {
+                        lostLatch.countDown();
+                        throw new CancelLeadershipException();
+                    }
+                }
+            };
+
+            selector = new LeaderSelector(client, "/leader", listener);
+            CountDownLatch debugLeadershipLatch = new CountDownLatch(1);
+            CountDownLatch debugLeadershipWaitLatch = new CountDownLatch(1);
+            selector.debugLeadershipLatch = debugLeadershipLatch;
+            selector.debugLeadershipWaitLatch = debugLeadershipWaitLatch;
+
+            selector.start();
+
+            Assert.assertTrue(timing.awaitLatch(debugLeadershipLatch));
+            server.stop();
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+            timing.sleepABit();
+            debugLeadershipWaitLatch.countDown();
+
+            server = new TestingServer(server.getPort(), server.getTempDirectory());
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+
+            Assert.assertFalse(takeLeadershipLatch.await(3, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            Closeables.closeQuietly(selector);
+            Closeables.closeQuietly(client);
+        }
+    }
 
     @Test
-    public void     testAutoRequeue() throws Exception
+    public void testAutoRequeue() throws Exception
     {
-        LeaderSelector      selector = null;
-        CuratorFramework    client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new
RetryOneTime(1)).sessionTimeoutMs(1000).build();
+        Timing timing = new Timing();
+        LeaderSelector selector = null;
+        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new
RetryOneTime(1)).sessionTimeoutMs(timing.session()).build();
         try
         {
             client.start();
 
-            final Semaphore             semaphore = new Semaphore(0);
-            LeaderSelectorListener      listener = new LeaderSelectorListener()
+            final Semaphore semaphore = new Semaphore(0);
+            LeaderSelectorListener listener = new LeaderSelectorListener()
             {
                 @Override
                 public void takeLeadership(CuratorFramework client) throws Exception
@@ -70,8 +183,8 @@ public class TestLeaderSelector extends BaseClassForTests
             selector = new LeaderSelector(client, "/leader", listener);
             selector.autoRequeue();
             selector.start();
-            
-            Assert.assertTrue(semaphore.tryAcquire(2, 10, TimeUnit.SECONDS));
+
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 2));
         }
         finally
         {
@@ -79,18 +192,18 @@ public class TestLeaderSelector extends BaseClassForTests
             Closeables.closeQuietly(client);
         }
     }
-    
+
     @Test
-    public void     testServerDying() throws Exception
+    public void testServerDying() throws Exception
     {
-        Timing              timing = new Timing();
-        LeaderSelector      selector = null;
-        CuratorFramework    client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).connectString(server.getConnectString()).retryPolicy(new
RetryOneTime(1)).sessionTimeoutMs(timing.session()).build();
+        Timing timing = new Timing();
+        LeaderSelector selector = null;
+        CuratorFramework client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).connectString(server.getConnectString()).retryPolicy(new
RetryOneTime(1)).sessionTimeoutMs(timing.session()).build();
         client.start();
         try
         {
-            final Semaphore             semaphore = new Semaphore(0);
-            LeaderSelectorListener      listener = new LeaderSelectorListener()
+            final Semaphore semaphore = new Semaphore(0);
+            LeaderSelectorListener listener = new LeaderSelectorListener()
             {
                 @Override
                 public void takeLeadership(CuratorFramework client) throws Exception
@@ -125,28 +238,25 @@ public class TestLeaderSelector extends BaseClassForTests
     }
 
     @Test
-    public void     testKillSession() throws Exception
+    public void testKillSession() throws Exception
     {
-        final Timing        timing = new Timing();
+        final Timing timing = new Timing();
 
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
         client.start();
         try
         {
-            final Semaphore         semaphore = new Semaphore(0);
-            final CountDownLatch    interruptedLatch = new CountDownLatch(1);
-            final AtomicInteger     leaderCount = new AtomicInteger(0);
-            LeaderSelectorListener  listener = new LeaderSelectorListener()
+            final Semaphore semaphore = new Semaphore(0);
+            final CountDownLatch interruptedLatch = new CountDownLatch(1);
+            final AtomicInteger leaderCount = new AtomicInteger(0);
+            LeaderSelectorListener listener = new LeaderSelectorListener()
             {
-                private volatile Thread     ourThread;
-
                 @Override
                 public void takeLeadership(CuratorFramework client) throws Exception
                 {
                     leaderCount.incrementAndGet();
                     try
                     {
-                        ourThread = Thread.currentThread();
                         semaphore.release();
                         try
                         {
@@ -167,9 +277,9 @@ public class TestLeaderSelector extends BaseClassForTests
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
                 {
-                    if ( (newState == ConnectionState.LOST) && (ourThread != null)
)
+                    if ( newState == ConnectionState.LOST )
                     {
-                        ourThread.interrupt();
+                        throw new CancelLeadershipException();
                     }
                 }
             };
@@ -202,13 +312,13 @@ public class TestLeaderSelector extends BaseClassForTests
     }
 
     @Test
-    public void     testClosing() throws Exception
+    public void testClosing() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         client.start();
         try
         {
-            final CountDownLatch        latch = new CountDownLatch(1);
+            final CountDownLatch latch = new CountDownLatch(1);
             LeaderSelector leaderSelector1 = new LeaderSelector(client, PATH_NAME, new LeaderSelectorListener()
             {
                 @Override
@@ -223,7 +333,7 @@ public class TestLeaderSelector extends BaseClassForTests
                 }
             });
 
-            LeaderSelector      leaderSelector2 = new LeaderSelector(client, PATH_NAME, new
LeaderSelectorListener()
+            LeaderSelector leaderSelector2 = new LeaderSelector(client, PATH_NAME, new LeaderSelectorListener()
             {
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -247,8 +357,8 @@ public class TestLeaderSelector extends BaseClassForTests
 
             Assert.assertNotSame(leaderSelector1.hasLeadership(), leaderSelector2.hasLeadership());
 
-            LeaderSelector      positiveLeader;
-            LeaderSelector      negativeLeader;
+            LeaderSelector positiveLeader;
+            LeaderSelector negativeLeader;
             if ( leaderSelector1.hasLeadership() )
             {
                 positiveLeader = leaderSelector1;
@@ -277,22 +387,22 @@ public class TestLeaderSelector extends BaseClassForTests
 
     @SuppressWarnings({"ForLoopReplaceableByForEach"})
     @Test
-    public void     testRotatingLeadership() throws Exception
+    public void testRotatingLeadership() throws Exception
     {
-        final int               LEADER_QTY = 5;
-        final int               REPEAT_QTY = 3;
+        final int LEADER_QTY = 5;
+        final int REPEAT_QTY = 3;
 
-        final Timing        timing = new Timing();
-        CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        final Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
         client.start();
         try
         {
-            final BlockingQueue<Integer>    leaderList = new LinkedBlockingQueue<Integer>();
-            List<LeaderSelector>            selectors = Lists.newArrayList();
+            final BlockingQueue<Integer> leaderList = new LinkedBlockingQueue<Integer>();
+            List<LeaderSelector> selectors = Lists.newArrayList();
             for ( int i = 0; i < LEADER_QTY; ++i )
             {
-                final int           ourIndex = i;
-                LeaderSelector      leaderSelector = new LeaderSelector(client, PATH_NAME,
new LeaderSelectorListener()
+                final int ourIndex = i;
+                LeaderSelector leaderSelector = new LeaderSelector(client, PATH_NAME, new
LeaderSelectorListener()
                 {
                     @Override
                     public void takeLeadership(CuratorFramework client) throws Exception
@@ -309,7 +419,7 @@ public class TestLeaderSelector extends BaseClassForTests
                 selectors.add(leaderSelector);
             }
 
-            List<Integer>                 localLeaderList = Lists.newArrayList();
+            List<Integer> localLeaderList = Lists.newArrayList();
             for ( int i = 1; i <= REPEAT_QTY; ++i )
             {
                 for ( LeaderSelector leaderSelector : selectors )
@@ -341,12 +451,12 @@ public class TestLeaderSelector extends BaseClassForTests
 
             for ( int i = 0; i < REPEAT_QTY; ++i )
             {
-                Set<Integer>        uniques = Sets.newHashSet();
+                Set<Integer> uniques = Sets.newHashSet();
                 for ( int j = 0; j < selectors.size(); ++j )
                 {
                     Assert.assertTrue(localLeaderList.size() > 0);
 
-                    int     thisIndex = localLeaderList.remove(0);
+                    int thisIndex = localLeaderList.remove(0);
                     Assert.assertFalse(uniques.contains(thisIndex));
                     uniques.add(thisIndex);
                 }


Mime
View raw message