Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 135A110AF3 for ; Wed, 25 Sep 2013 16:10:13 +0000 (UTC) Received: (qmail 3191 invoked by uid 500); 25 Sep 2013 16:10:05 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 3044 invoked by uid 500); 25 Sep 2013 16:10:03 -0000 Mailing-List: contact commits-help@curator.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.incubator.apache.org Delivered-To: mailing list commits@curator.incubator.apache.org Received: (qmail 3000 invoked by uid 99); 25 Sep 2013 16:10:02 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Sep 2013 16:10:02 +0000 X-ASF-Spam-Status: No, hits=-2002.3 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 25 Sep 2013 16:09:59 +0000 Received: (qmail 2938 invoked by uid 99); 25 Sep 2013 16:09:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Sep 2013 16:09:38 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 89B5C90A054; Wed, 25 Sep 2013 16:09:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.incubator.apache.org Date: Wed, 25 Sep 2013 16:09:39 -0000 Message-Id: <0ccfddd2893448dbbe5c6634d9675ee0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss: see CURATOR-45. The issue is that retries fail due to connection timeout but the node was actually created on the ser X-Virus-Checked: Checked by ClamAV on apache.org Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss: see CURATOR-45. The issue is that retries fail due to connection timeout but the node was actually created on the server. The client will think the create failed, but the node still exists. So, this change attempts to delete the node once the connection is re-established which then aligns the server with what the client believes is true. Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/4b166a09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/4b166a09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/4b166a09 Branch: refs/heads/master Commit: 4b166a09eeb054671ed127b20703476b3103b5d1 Parents: 10eb1ef Author: randgalt Authored: Wed Sep 25 09:01:00 2013 -0700 Committer: randgalt Committed: Wed Sep 25 09:06:24 2013 -0700 ---------------------------------------------------------------------- .../framework/imps/CreateBuilderImpl.java | 352 +++++++++++++------ .../recipes/leader/ChaosMonkeyCnxnFactory.java | 124 +++++++ .../recipes/leader/TestLeaderSelectorEdges.java | 255 ++++++++++++++ 3 files changed, 615 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/4b166a09/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index ee99074..ebd342f 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import com.google.common.annotations.VisibleForTesting; @@ -23,6 +24,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.*; import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; import org.apache.curator.framework.api.transaction.OperationType; @@ -41,20 +43,20 @@ import java.util.concurrent.atomic.AtomicBoolean; class CreateBuilderImpl implements CreateBuilder, BackgroundOperation { - private final CuratorFrameworkImpl client; - private CreateMode createMode; - private Backgrounding backgrounding; - private boolean createParentsIfNeeded; - private boolean doProtected; - private boolean compress; - private String protectedId; - private ACLing acling; + private final CuratorFrameworkImpl client; + private CreateMode createMode; + private Backgrounding backgrounding; + private boolean createParentsIfNeeded; + private boolean doProtected; + private boolean compress; + private String protectedId; + private ACLing acling; @VisibleForTesting boolean failNextCreateForTesting = false; @VisibleForTesting - static final String PROTECTED_PREFIX = "_c_"; + static final String PROTECTED_PREFIX = "_c_"; CreateBuilderImpl(CuratorFrameworkImpl client) { @@ -68,7 +70,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background"); + final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background"); client.getZooKeeper().create - ( - operationAndData.getData().getPath(), - operationAndData.getData().getData(), - acling.getAclList(operationAndData.getData().getPath()), - createMode, - new AsyncCallback.StringCallback() - { - @Override - public void processResult(int rc, String path, Object ctx, String name) + ( + operationAndData.getData().getPath(), + operationAndData.getData().getData(), + acling.getAclList(operationAndData.getData().getPath()), + createMode, + new AsyncCallback.StringCallback() { - trace.commit(); - - if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded ) - { - backgroundCreateParentsThenNode(operationAndData); - } - else + @Override + public void processResult(int rc, String path, Object ctx, String name) { - sendBackgroundResponse(rc, path, ctx, name, operationAndData); + trace.commit(); + + if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded ) + { + backgroundCreateParentsThenNode(operationAndData); + } + else + { + sendBackgroundResponse(rc, path, ctx, name, operationAndData); + } } - } - }, - backgrounding.getContext() - ); + }, + backgrounding.getContext() + ); } - private String getProtectedPrefix() + private static String getProtectedPrefix(String protectedId) { return PROTECTED_PREFIX + protectedId + "-"; } private void backgroundCreateParentsThenNode(final OperationAndData mainOperationAndData) { - BackgroundOperation operation = new BackgroundOperation() + BackgroundOperation operation = new BackgroundOperation() { @Override public void performBackgroundOperation(OperationAndData dummy) throws Exception @@ -486,7 +513,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation parentOperation = new OperationAndData(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext()); + OperationAndData parentOperation = new OperationAndData(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext()); client.queueOperation(parentOperation); } @@ -558,16 +585,39 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation operationAndData = new OperationAndData(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext()) + OperationAndData operationAndData = new OperationAndData(this, new PathAndBytes(path, data), backgrounding.getCallback(), + new OperationAndData.ErrorCallback() + { + public void retriesExhausted(OperationAndData operationAndData) + { + if ( doProtected ) + { + // all retries have failed, findProtectedNodeInForeground(..) included, schedule a clean up + findAndDeleteProtectedNodeInBackground(path, protectedId, null); + // assign a new id if this builder is used again later + protectedId = UUID.randomUUID().toString(); + } + } + }, + backgrounding.getContext()) { @Override void callPerformBackgroundOperation() throws Exception { - boolean callSuper = true; - boolean localFirstTime = firstTime.getAndSet(false); + boolean callSuper = true; + boolean localFirstTime = firstTime.getAndSet(false); if ( !localFirstTime && doProtected ) { - String createdPath = findProtectedNodeInForeground(path); + String createdPath = null; + try + { + createdPath = findProtectedNodeInForeground(path); + } + catch ( KeeperException.ConnectionLossException e ) + { + sendBackgroundResponse(KeeperException.Code.CONNECTIONLOSS.intValue(), path, backgrounding.getContext(), null, this); + callSuper = false; + } if ( createdPath != null ) { try @@ -600,117 +650,187 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation() - { - @Override - public String call() throws Exception + final AtomicBoolean firstTime = new AtomicBoolean(true); + String returnPath = RetryLoop.callWithRetry + ( + client.getZookeeperClient(), + new Callable() { - boolean localFirstTime = firstTime.getAndSet(false); - - String createdPath = null; - if ( !localFirstTime && doProtected ) + @Override + public String call() throws Exception { - createdPath = findProtectedNodeInForeground(path); - } + boolean localFirstTime = firstTime.getAndSet(false); - if ( createdPath == null ) - { - try + String createdPath = null; + if ( !localFirstTime && doProtected ) { - createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode); + createdPath = findProtectedNodeInForeground(path); } - catch ( KeeperException.NoNodeException e ) + + if ( createdPath == null ) { - if ( createParentsIfNeeded ) + try { - ZKPaths.mkdirs(client.getZooKeeper(), path, false); createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode); } - else + catch ( KeeperException.NoNodeException e ) { - throw e; + if ( createParentsIfNeeded ) + { + ZKPaths.mkdirs(client.getZooKeeper(), path, false); + createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode); + } + else + { + throw e; + } } } - } - if ( failNextCreateForTesting ) - { - failNextCreateForTesting = false; - throw new KeeperException.ConnectionLossException(); + if ( failNextCreateForTesting ) + { + failNextCreateForTesting = false; + throw new KeeperException.ConnectionLossException(); + } + return createdPath; } - return createdPath; } - } - ); + ); trace.commit(); return returnPath; } - private String findProtectedNodeInForeground(final String path) throws Exception + private String findProtectedNodeInForeground(final String path) throws Exception { - TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground"); + TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground"); - String returnPath = RetryLoop.callWithRetry - ( - client.getZookeeperClient(), - new Callable() - { - @Override - public String call() throws Exception + String returnPath = RetryLoop.callWithRetry + ( + client.getZookeeperClient(), + new Callable() { - String foundNode = null; - try + @Override + public String call() throws Exception { - final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); - List children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false); - - final String protectedPrefix = getProtectedPrefix(); - foundNode = Iterables.find - ( - children, - new Predicate() - { - @Override - public boolean apply(String node) - { - return node.startsWith(protectedPrefix); - } - }, - null - ); - if ( foundNode != null ) + String foundNode = null; + try { - foundNode = ZKPaths.makePath(pathAndNode.getPath(), foundNode); + final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + List children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false); + + foundNode = findNode(children, pathAndNode.getPath(), protectedId); } + catch ( KeeperException.NoNodeException ignore ) + { + // ignore + } + return foundNode; } - catch ( KeeperException.NoNodeException ignore ) - { - // ignore - } - return foundNode; } - } - ); + ); trace.commit(); return returnPath; } - private String adjustPath(String path) throws Exception + private String adjustPath(String path) throws Exception { if ( doProtected ) { - ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); - String name = getProtectedPrefix() + pathAndNode.getNode(); + ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + String name = getProtectedPrefix(protectedId) + pathAndNode.getNode(); path = ZKPaths.makePath(pathAndNode.getPath(), name); } return path; } + + /** + * Attempt to delete a protected znode + * + * @param path the path + * @param protectedId the protected id + * @param callback callback to use, null to create a new one + */ + private void findAndDeleteProtectedNodeInBackground(String path, String protectedId, FindProtectedNodeCB callback) + { + if ( client.isStarted() ) + { + if ( callback == null ) + { + callback = new FindProtectedNodeCB(path, protectedId); + } + try + { + client.getChildren().inBackground(callback).forPath(ZKPaths.getPathAndNode(path).getPath()); + } + catch ( Exception e ) + { + findAndDeleteProtectedNodeInBackground(path, protectedId, callback); + } + } + } + + private class FindProtectedNodeCB implements BackgroundCallback + { + final String path; + final String protectedId; + + private FindProtectedNodeCB(String path, String protectedId) + { + this.path = path; + this.protectedId = protectedId; + } + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + final String node = findNode(event.getChildren(), ZKPaths.getPathAndNode(path).getPath(), protectedId); + if ( node != null ) + { + client.delete().guaranteed().inBackground().forPath(node); + } + } + else if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() ) + { + // retry + findAndDeleteProtectedNodeInBackground(path, protectedId, this); + } + } + } + + /** + * Attempt to find the znode that matches the given path and protected id + * + * @param children a list of candidates znodes + * @param path the path + * @param protectedId the protected id + * @return the absolute path of the znode or null if it is not found + */ + private static String findNode(final List children, final String path, final String protectedId) + { + final String protectedPrefix = getProtectedPrefix(protectedId); + String foundNode = Iterables.find + ( + children, + new Predicate() + { + @Override + public boolean apply(String node) + { + return node.startsWith(protectedPrefix); + } + }, + null + ); + if ( foundNode != null ) + { + foundNode = ZKPaths.makePath(path, foundNode); + } + return foundNode; + } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/4b166a09/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java new file mode 100644 index 0000000..5f10c5e --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.leader; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.server.ByteBufferInputStream; +import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +/** + * A connection factory that will behave like the NIOServerCnxnFactory except that + * it will unexpectedly close the connection right after the first znode has + * been created in Zookeeper. + * Subsequent create operations will succeed. + */ +public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory +{ + public static final String CHAOS_ZNODE = "/mylock"; + public static final String CHAOS_ZNODE_PREFIX = CHAOS_ZNODE + "/"; + + private static final Logger log = LoggerFactory.getLogger(ChaosMonkeyCnxnFactory.class); + + /* How long after the first error, connections are rejected */ + public static final long LOCKOUT_DURATION_MS = 6000; + + public ChaosMonkeyCnxnFactory() throws IOException + { + } + + @Override + public void startup(ZooKeeperServer zks) throws IOException, InterruptedException + { + super.startup(new ChaosMonkeyZookeeperServer(zks)); + } + + /** + * Build a connection with a Chaos Monkey ZookeeperServer + */ + protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException + { + return new NIOServerCnxn(zkServer, sock, sk, this); + } + + public static class ChaosMonkeyZookeeperServer extends ZooKeeperServer + { + private long firstError = 0; + + public ChaosMonkeyZookeeperServer(ZooKeeperServer zks) + { + setTxnLogFactory(zks.getTxnLogFactory()); + setTickTime(zks.getTickTime()); + setMinSessionTimeout(zks.getMinSessionTimeout()); + setMaxSessionTimeout(zks.getMaxSessionTimeout()); + } + + @Override + public void submitRequest(Request si) + { + long remaining = firstError != 0 ? LOCKOUT_DURATION_MS - (System.currentTimeMillis() - firstError) : 0; + if ( si.type != ZooDefs.OpCode.createSession && si.type != ZooDefs.OpCode.sync && si.type != ZooDefs.OpCode.ping + && firstError != 0 && remaining > 0 ) + { + log.debug("Rejected : " + si.toString()); + // Still reject request + log.debug("Still not ready for " + remaining + "ms"); + ((NIOServerCnxn)si.cnxn).close(); + return; + } + // Submit the request to the legacy Zookeeper server + log.debug("Applied : " + si.toString()); + super.submitRequest(si); + // Raise an error if a lock is created + if ( si.type == ZooDefs.OpCode.create ) + { + CreateRequest createRequest = new CreateRequest(); + try + { + ByteBuffer duplicate = si.request.duplicate(); + duplicate.rewind(); + ByteBufferInputStream.byteBuffer2Record(duplicate, createRequest); + if ( createRequest.getPath().startsWith(CHAOS_ZNODE_PREFIX) + && firstError == 0 ) + { + firstError = System.currentTimeMillis(); + // The znode has been created, close the connection and don't tell it to client + log.warn("Closing connection right after " + createRequest.getPath() + " creation"); + ((NIOServerCnxn)si.cnxn).close(); + } + } + catch ( Exception e ) + { + // Should not happen + ((NIOServerCnxn)si.cnxn).close(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/4b166a09/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java new file mode 100644 index 0000000..e89c958 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.leader; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.recipes.BaseClassForTests; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryNTimes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Test cases designed after CURATOR-45 + */ +public class TestLeaderSelectorEdges extends BaseClassForTests +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + + @BeforeClass + public static void setCNXFactory() + { + System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, ChaosMonkeyCnxnFactory.class.getName()); + } + + @AfterClass + public static void resetCNXFactory() + { + System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY); + } + + /** + * Create a LeaderSelector but close the connection right after the "lock" znode + * has been created. + * + * @throws Exception + */ + @Test + public void flappingTest() throws Exception + { + final CuratorFramework client = + CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryNTimes(1, 500)) + .sessionTimeoutMs(30000) + .build(); + + final TestLeaderSelectorListener listener = new TestLeaderSelectorListener(); + LeaderSelector leaderSelector1 = + new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE, listener); + LeaderSelector leaderSelector2 = null; + + client.start(); + try + { + client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE); + leaderSelector1.start(); + // At this point the ChaosMonkeyZookeeperServer must close the connection + // right after the lock znode is created. + Assert.assertTrue(listener.reconnected.await(10, TimeUnit.SECONDS), "Connection has not been lost"); + // Check that leader ship has failed + Assert.assertEquals(listener.takeLeadership.getCount(), 1); + // Wait FailedDelete + Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2); + // Check that there is no znode + final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size(); + Assert.assertEquals(children, 0, + "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock"); + // Check that a new LeaderSelector can be started + leaderSelector2 = new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE, + listener); + leaderSelector2.start(); + Assert.assertTrue(listener.takeLeadership.await(1, TimeUnit.SECONDS)); + } + finally + { + try + { + leaderSelector1.close(); + } + catch ( IllegalStateException e ) + { + Assert.fail(e.getMessage()); + } + try + { + if ( leaderSelector2 != null ) + { + leaderSelector2.close(); + } + } + catch ( IllegalStateException e ) + { + Assert.fail(e.getMessage()); + } + client.close(); + } + } + + private class TestLeaderSelectorListener implements LeaderSelectorListener + { + final CountDownLatch takeLeadership = new CountDownLatch(1); + final CountDownLatch reconnected = new CountDownLatch(1); + + @Override + public void takeLeadership(CuratorFramework client) throws Exception + { + log.info("-->takeLeadership({})", client.toString()); + takeLeadership.countDown(); + log.info("<--takeLeadership({})", client.toString()); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState == ConnectionState.RECONNECTED ) + { + reconnected.countDown(); + } + } + + } + + /** + * Create a protected node in background with a retry policy + */ + @Test + public void createProtectedNodeInBackgroundTest() throws Exception + { + final CuratorFramework client = + CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryNTimes(2, 1)) + .connectionTimeoutMs(100) + .sessionTimeoutMs(60000) + .build(); + final CountDownLatch latch = new CountDownLatch(1); + client.start(); + try + { + client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE); + client.create() + .withProtection() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) + .inBackground( + new BackgroundCallback() + { + public void processResult(CuratorFramework client, CuratorEvent event) + throws Exception + { + log.info("Receive event {}", event.toString()); + if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() ) + { + latch.countDown(); + } + } + } + ) + .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-"); + + Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called"); + // Wait for the znode to be deleted + Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2); + // Check that there is no znode + final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size(); + Assert.assertEquals(children, 0, + "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock"); + } + finally + { + client.close(); + } + + } + + /** + * Same test as above but without a retry policy + */ + @Test + public void createProtectedNodeInBackgroundTestNoRetry() throws Exception + { + final CuratorFramework client = + CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryNTimes(0, 0)) + .connectionTimeoutMs(100) + .sessionTimeoutMs(60000) + .build(); + final CountDownLatch latch = new CountDownLatch(1); + client.start(); + try + { + client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE); + client.create() + .withProtection() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) + .inBackground( + new BackgroundCallback() + { + public void processResult(CuratorFramework client, CuratorEvent event) + throws Exception + { + log.info("Receive event {}", event.toString()); + if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() ) + { + latch.countDown(); + } + } + } + ) + .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-"); + + Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called"); + // Wait for the znode to be deleted + Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2); + // Check that there is no znode + final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size(); + Assert.assertEquals(children, 0, + "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock"); + } + finally + { + client.close(); + } + + } +}