Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8258418AB4 for ; Sat, 22 Aug 2015 15:47:11 +0000 (UTC) Received: (qmail 85626 invoked by uid 500); 22 Aug 2015 15:47:11 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 85584 invoked by uid 500); 22 Aug 2015 15:47:11 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 85575 invoked by uid 99); 22 Aug 2015 15:47:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 15:47:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D26ADFDCE; Sat, 22 Aug 2015 15:47:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Sat, 22 Aug 2015 15:47:12 -0000 Message-Id: In-Reply-To: <1f5fc8c8c0ea4023aab2d8e63b3da9cf@git.apache.org> References: <1f5fc8c8c0ea4023aab2d8e63b3da9cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] curator git commit: wip wip Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/847cc0d2 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/847cc0d2 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/847cc0d2 Branch: refs/heads/CURATOR-247 Commit: 847cc0d2415f59c2943d4a2734564119ffb38bb1 Parents: b8d4c3d Author: randgalt Authored: Sat Aug 22 10:47:01 2015 -0500 Committer: randgalt Committed: Sat Aug 22 10:47:01 2015 -0500 ---------------------------------------------------------------------- .../org/apache/curator/ConnectionState.java | 15 ++++++-- .../apache/curator/CuratorZookeeperClient.java | 36 ++++++++++++++++++-- .../framework/imps/CuratorFrameworkImpl.java | 12 ++----- .../framework/state/ConnectionStateManager.java | 2 +- .../imps/TestEnabledSessionExpiredState.java | 2 +- ...estResetConnectionWithBackgroundFailure.java | 19 +++++++---- .../java/org/apache/curator/test/Timing.java | 21 ++++++++++++ 7 files changed, 84 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-client/src/main/java/org/apache/curator/ConnectionState.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java index 1dfdbef..c3d6921 100644 --- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java +++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java @@ -52,6 +52,7 @@ class ConnectionState implements Watcher, Closeable private final Queue parentWatchers = new ConcurrentLinkedQueue(); private final AtomicLong instanceIndex = new AtomicLong(); private volatile long connectionStartMs = 0; + private final AtomicBoolean enableTimeoutChecks = new AtomicBoolean(true); ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference tracer, boolean canBeReadOnly) { @@ -67,6 +68,11 @@ class ConnectionState implements Watcher, Closeable zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly); } + void disableTimeoutChecks() + { + enableTimeoutChecks.set(false); + } + ZooKeeper getZooKeeper() throws Exception { if ( SessionFailRetryLoop.sessionForThreadHasFailed() ) @@ -81,10 +87,13 @@ class ConnectionState implements Watcher, Closeable throw exception; } - boolean localIsConnected = isConnected.get(); - if ( !localIsConnected ) + if ( enableTimeoutChecks.get() ) { - checkTimeouts(); + boolean localIsConnected = isConnected.get(); + if ( !localIsConnected ) + { + checkTimeouts(); + } } return zooKeeper.getZooKeeper(); http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java index fbb2f4c..ce6e9d3 100644 --- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java +++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java @@ -50,6 +50,7 @@ public class CuratorZookeeperClient implements Closeable private final int connectionTimeoutMs; private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicReference tracer = new AtomicReference(new DefaultTracerDriver()); + private final boolean manageTimeouts; /** * @@ -61,7 +62,7 @@ public class CuratorZookeeperClient implements Closeable */ public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy) { - this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false); + this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true); } /** @@ -73,7 +74,7 @@ public class CuratorZookeeperClient implements Closeable */ public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy) { - this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false); + this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true); } /** @@ -90,6 +91,25 @@ public class CuratorZookeeperClient implements Closeable */ public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly) { + this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, true); + } + + /** + * @param zookeeperFactory factory for creating {@link ZooKeeper} instances + * @param ensembleProvider the ensemble provider + * @param sessionTimeoutMs session timeout + * @param connectionTimeoutMs connection timeout + * @param watcher default watcher or null + * @param retryPolicy the retry policy to use + * @param canBeReadOnly if true, allow ZooKeeper client to enter + * read only mode in case of a network partition. See + * {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)} + * for details + * @param manageTimeouts in general, Curator clients try to manage session/connection timeouts. If this is false, that management is turned off + */ + public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, boolean manageTimeouts) + { + this.manageTimeouts = manageTimeouts; if ( sessionTimeoutMs < connectionTimeoutMs ) { log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs)); @@ -100,6 +120,10 @@ public class CuratorZookeeperClient implements Closeable this.connectionTimeoutMs = connectionTimeoutMs; state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly); + if ( !manageTimeouts ) + { + state.disableTimeoutChecks(); + } setRetryPolicy(retryPolicy); } @@ -302,9 +326,15 @@ public class CuratorZookeeperClient implements Closeable return state.getInstanceIndex(); } + /** + * Returns true if connection timeouts should cause the retry policy to be checked. If false + * is returned, throw a connection exception without retrying + * + * @return true/false + */ public boolean retryConnectionTimeouts() { - return true; + return manageTimeouts; } void addParentWatcher(Watcher watcher) http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index c359fdc..bcbeecd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -122,15 +122,9 @@ public class CuratorFrameworkImpl implements CuratorFramework } }, builder.getRetryPolicy(), - builder.canBeReadOnly() - ) - { - @Override - public boolean retryConnectionTimeouts() - { - return !enableSessionExpiredState; - } - }; + builder.canBeReadOnly(), + !builder.getEnableSessionExpiredState() // inverse is correct here. By default, CuratorZookeeperClient manages timeouts. The new SessionExpiredState needs this disabled. + ); listeners = new ListenerContainer(); unhandledErrorListeners = new ListenerContainer(); http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 553faac..52e0d07 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -295,7 +295,7 @@ public class ConnectionStateManager implements Closeable long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch; if ( elapsedMs >= sessionTimeoutMs ) { - log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs)); + log.warn(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs)); try { client.getZookeeperClient().reset(); http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java index 150eb50..cd415b1 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java @@ -123,7 +123,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests { Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED); server.stop(); - Thread.sleep(timing.multiple(1.2).session()); + timing.sleepForSession(); Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); Assert.assertEquals(states.poll(timing.multiple(2).session(), TimeUnit.MILLISECONDS), ConnectionState.LOST); server.restart(); http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java index 7d2cb89..b90311b 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.client; +import com.google.common.collect.Queues; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; @@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests { @@ -53,7 +56,6 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests { server.stop(); - final StringBuilder listenerSequence = new StringBuilder(); LeaderSelector selector = null; Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100)); @@ -74,12 +76,13 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests selector.autoRequeue(); selector.start(); + final BlockingQueue states = Queues.newLinkedBlockingQueue(); ConnectionStateListener listener1 = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - listenerSequence.append("-").append(newState); + states.add(newState); } }; @@ -90,17 +93,21 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests log.debug("Stopping ZK server"); server.stop(); - timing.forWaiting().sleepABit(); + timing.sleepForSession(); log.debug("Starting ZK server"); server.restart(); - timing.forWaiting().sleepABit(); + + Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED); + Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); + Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED); log.debug("Stopping ZK server"); server.close(); - timing.forWaiting().sleepABit(); - Assert.assertEquals(listenerSequence.toString(), "-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST"); + Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); + Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST); } finally { http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-test/src/main/java/org/apache/curator/test/Timing.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java index 753d62d..fc4b314 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Timing.java +++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java @@ -35,6 +35,7 @@ public class Timing private static final int DEFAULT_SECONDS = 10; private static final int DEFAULT_WAITING_MULTIPLE = 5; private static final double SESSION_MULTIPLE = 1.5; + private static final double SESSION_SLEEP_MULTIPLE = 1.75; // has to be at least session + 2/3 of a session to account for missed heartbeat then session expiration /** * Use the default base time @@ -200,6 +201,26 @@ public class Timing } /** + * Sleep enough so that the session should expire + * + * @throws InterruptedException if interrupted + */ + public void sleepForSession() throws InterruptedException + { + TimeUnit.MILLISECONDS.sleep(sessionSleep()); + } + + /** + * Return the value to sleep to ensure a ZK session timeout + * + * @return session sleep timeout + */ + public int sessionSleep() + { + return multiple(SESSION_SLEEP_MULTIPLE).session(); + } + + /** * Return the value to use for ZK session timeout * * @return session timeout