aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject aurora git commit: Make leader elections resilient to ZK disconnections.
Date Mon, 23 Jan 2017 22:39:16 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 75129b694 -> 1e2a9e160


Make leader elections resilient to ZK disconnections.

As documented in AURORA-1840 the Curator `LeaderLatch` recipe abdicates
leadership if the ZK connection is lost or if there is a timeout. This is not
compatible with the commons based implementation which would only abdicate
leadership if the ZK session timeout occurred.

This replaces the `LeaderLatch` recipe with the `LeaderSelector` recipe with a
custom listener that only loses leadership if a connection loss occurs.

Bugs closed: AURORA-1669

Reviewed at https://reviews.apache.org/r/54288/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/1e2a9e16
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/1e2a9e16
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/1e2a9e16

Branch: refs/heads/master
Commit: 1e2a9e160a41c3d916ff1a152b1d00e5b1ee380d
Parents: 75129b6
Author: Zameer Manji <zmanji@apache.org>
Authored: Mon Jan 23 14:38:56 2017 -0800
Committer: Zameer Manji <zmanji@apache.org>
Committed: Mon Jan 23 14:38:56 2017 -0800

----------------------------------------------------------------------
 .../zookeeper/testing/ZooKeeperTestServer.java  |  2 +-
 .../discovery/CuratorSingletonService.java      | 56 ++++++++++++++----
 .../discovery/BaseCuratorDiscoveryTest.java     |  5 ++
 .../discovery/CuratorSingletonServiceTest.java  | 60 ++++++++++++++++++--
 4 files changed, 107 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/1e2a9e16/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
index 50acaeb..29204cd 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -82,7 +82,7 @@ public class ZooKeeperTestServer {
   /**
    * Starts zookeeper back up on the last used port.
    */
-  final void restartNetwork() throws IOException, InterruptedException {
+  public final void restartNetwork() throws IOException, InterruptedException {
     checkEphemeralPortAssigned();
     Preconditions.checkState(connectionFactory == null);
     startNetwork();

http://git-wip-us.apache.org/repos/asf/aurora/blob/1e2a9e16/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
index c378172..2847c41 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
@@ -17,6 +17,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Maps;
@@ -28,18 +30,25 @@ import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
 import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.recipes.leader.CancelLeadershipException;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import org.apache.curator.framework.recipes.nodes.PersistentNode;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
 class CuratorSingletonService implements SingletonService {
 
+  private static final Logger LOG = LoggerFactory.getLogger(CuratorSingletonService.class);
+
   // This is the complement of the CuratorServiceGroupMonitor, it allows advertisement of
a leader
   // in a service group.
   private static class Advertiser {
@@ -151,11 +160,22 @@ class CuratorSingletonService implements SingletonService {
     requireNonNull(additionalEndpoints);
     requireNonNull(listener);
 
-    LeaderLatch leaderLatch = new LeaderLatch(client, groupPath, endpoint.getHostName());
     Closer closer = Closer.create();
-    leaderLatch.addListener(new LeaderLatchListener() {
+
+    CountDownLatch giveUpLeadership = new CountDownLatch(1);
+
+    // We do not use the suggested `LeaderSelectorListenerAdapter` or the LeaderLatch class
+    // because we want to have precise control over state changes. By default the listener
and the
+    // latch class treat `SUSPENDED` (connection loss) as fatal and a reason to lose leadership.
+    // To make the scheduler resilient to connection blips and long GC pauses, we only treat
+    // `LOST` (session loss) as fatal.
+
+    ExecutorService executor =
+        AsyncUtil.singleThreadLoggingScheduledExecutor("LeaderSelector-%d", LOG);
+
+    LeaderSelectorListener leaderSelectorListener = new LeaderSelectorListener() {
       @Override
-      public void isLeader() {
+      public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
         listener.onLeading(new LeaderControl() {
           @Override
           public void advertise() throws AdvertiseException, InterruptedException {
@@ -165,27 +185,43 @@ class CuratorSingletonService implements SingletonService {
           @Override
           public void leave() throws LeaveException {
             try {
+              giveUpLeadership.countDown();
               closer.close();
             } catch (IOException e) {
               throw new LeaveException("Failed to abdicate leadership of group at " + groupPath,
e);
             }
           }
         });
+
+        // The contract is to block as long as we want leadership. The leader never gives
up
+        // leadership voluntarily, only when asked to shutdown so we block until our shutdown
+        // callback has been executed or we have lost our ZK connection.
+        giveUpLeadership.await();
       }
 
       @Override
-      public void notLeader() {
-        listener.onDefeated();
+      public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState)
{
+        if (newState == ConnectionState.LOST) {
+          giveUpLeadership.countDown();
+          listener.onDefeated();
+          throw new CancelLeadershipException();
+        }
+
       }
-    });
+    };
+
+    LeaderSelector leaderSelector =
+        new LeaderSelector(client, groupPath, executor, leaderSelectorListener);
+
+    leaderSelector.setId(endpoint.getHostName());
 
     try {
-      leaderLatch.start();
+      leaderSelector.start();
     } catch (Exception e) {
       // NB: We failed to lead; so we never could have advertised and there is no need to
close the
       // closer.
       throw new LeadException("Failed to begin awaiting leadership of group " + groupPath,
e);
     }
-    closer.register(leaderLatch);
+    closer.register(leaderSelector);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/1e2a9e16/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
index a2b4125..226b068 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
@@ -73,6 +73,11 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
     getServer().expireClientSession(curator.getZookeeperClient().getZooKeeper().getSessionId());
   }
 
+  final void causeDisconnection() throws Exception {
+    getServer().stop();
+    getServer().restartNetwork();
+  }
+
   final CuratorFramework getClient() {
     return client;
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/1e2a9e16/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
index 6ea49b0..946a78e 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
@@ -15,6 +15,8 @@ package org.apache.aurora.scheduler.discovery;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -23,10 +25,11 @@ import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.easymock.Capture;
-import org.easymock.IAnswer;
 import org.easymock.IMocksControl;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createControl;
@@ -39,6 +42,13 @@ import static org.junit.Assert.assertTrue;
 public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest {
 
   private IMocksControl control;
+  // This test has a lot of blocking, this ensures we don't deadlock.
+  private final Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+
+  @Rule
+  public Timeout getTimeout() {
+    return timeout;
+  }
 
   @Before
   public void setUpSingletonService() throws Exception {
@@ -80,7 +90,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest
{
 
     // Wait to become leader.
     expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
-    assertTrue(capture.hasCaptured());
+    awaitCapture(capture);
 
     // Leadership nodes should not be seen as service group nodes.
     assertEquals(ImmutableSet.of(), getGroupMonitor().get());
@@ -111,7 +121,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest
{
     // Have host1 become leader.
     newLeader(getClient(), "host1", host1Listener);
     expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
-    assertTrue(host1OnLeadingCapture.hasCaptured());
+    awaitCapture(host1OnLeadingCapture);
 
     host1OnLeadingCapture.getValue().advertise();
     expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
@@ -140,7 +150,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest
{
 
     CountDownLatch host1Defeated = new CountDownLatch(1);
     host1Listener.onDefeated();
-    expectLastCall().andAnswer((IAnswer<Void>) () -> {
+    expectLastCall().andAnswer(() -> {
       host1Defeated.countDown();
       return null;
     });
@@ -157,7 +167,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest
{
     // Have host1 become leader.
     newLeader(getClient(), "host1", host1Listener);
     expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
-    assertTrue(host1OnLeadingCapture.hasCaptured());
+    awaitCapture(host1OnLeadingCapture);
 
     host1OnLeadingCapture.getValue().advertise();
     expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
@@ -186,6 +196,46 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest
{
     host1Defeated.await();
   }
 
+  @Test
+  public void testZKDisconnection() throws Exception {
+    CountDownLatch leading = new CountDownLatch(1);
+    AtomicBoolean leader = new AtomicBoolean(false);
+    CountDownLatch defeated = new CountDownLatch(1);
+
+    // The listener is executed in an internal thread of Curator, where it executes the leader
+    // listener callbacks. The exceptions there are not propagated out, so we have our own
+    // listener to validate behaviour.
+    SingletonService.LeadershipListener listener = new SingletonService.LeadershipListener()
{
+      @Override
+      public void onLeading(SingletonService.LeaderControl leaderControl) {
+        leader.set(true);
+        leading.countDown();
+      }
+
+      @Override
+      public void onDefeated() {
+        leader.set(false);
+        defeated.countDown();
+      }
+    };
+
+    control.replay();
+
+    startGroupMonitor();
+
+    CuratorFramework client = getClient();
+    newLeader(client, "host1", listener);
+    leading.await();
+
+    causeDisconnection();
+    assertTrue(leader.get());
+
+    expireSession(client);
+    defeated.await();
+
+    assertFalse(leader.get());
+  }
+
   private void awaitCapture(Capture<?> capture) throws InterruptedException {
     while (!capture.hasCaptured()) {
       Thread.sleep(1L);


Mime
View raw message