Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E2C1510645 for ; Tue, 15 Sep 2015 02:31:35 +0000 (UTC) Received: (qmail 86921 invoked by uid 500); 15 Sep 2015 02:31:13 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 86892 invoked by uid 500); 15 Sep 2015 02:31:13 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 86883 invoked by uid 99); 15 Sep 2015 02:31:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Sep 2015 02:31:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AD48DE07D6; Tue, 15 Sep 2015 02:31:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: curator git commit: Added an option for session expiration management to be a fraction of the negotiated session timeout. This is meant to account for timing/network differences between the client and server. Date: Tue, 15 Sep 2015 02:31:13 +0000 (UTC) Repository: curator Updated Branches: refs/heads/CURATOR-3.0 24de710d2 -> 64bb8841a Added an option for session expiration management to be a fraction of the negotiated session timeout. This is meant to account for timing/network differences between the client and server. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/64bb8841 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/64bb8841 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/64bb8841 Branch: refs/heads/CURATOR-3.0 Commit: 64bb8841a39e1d82de091d23d1865ba47236e5ad Parents: 24de710 Author: randgalt Authored: Mon Sep 14 21:04:36 2015 -0500 Committer: randgalt Committed: Mon Sep 14 21:04:36 2015 -0500 ---------------------------------------------------------------------- .../ClassicConnectionHandlingPolicy.java | 4 +-- .../connection/ConnectionHandlingPolicy.java | 27 +++++++++++++++++--- .../StandardConnectionHandlingPolicy.java | 17 ++++++++++-- .../imps/ClassicInternalConnectionHandler.java | 6 ----- .../framework/imps/CuratorFrameworkImpl.java | 7 ++--- .../imps/InternalConnectionHandler.java | 2 -- .../imps/StandardInternalConnectionHandler.java | 6 ----- .../framework/state/ConnectionStateManager.java | 13 +++++----- 8 files changed, 52 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java index e4c59f4..8116308 100644 --- a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java +++ b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java @@ -28,9 +28,9 @@ import java.util.concurrent.Callable; public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy { @Override - public boolean isEmulatingClassicHandling() + public int getSimulatedSessionExpirationPercent() { - return true; + return 0; } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java index ae77861..c47577d 100644 --- a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java +++ b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java @@ -29,11 +29,32 @@ import java.util.concurrent.Callable; public interface ConnectionHandlingPolicy { /** - * Return true if this policy should behave like the pre-3.0.0 version of Curator + *

+ * Prior to 3.0.0, Curator did not try to manage session expiration + * other than the functionality provided by ZooKeeper itself. Starting with + * 3.0.0, Curator has the option of attempting to monitor session expiration + * above what is provided by ZooKeeper. The percentage returned by this method + * determines how and if Curator will check for session expiration. + *

* - * @return true/false + *

+ * If this method returns 0, Curator does not + * do any additional checking for session expiration. + *

+ * + *

+ * If a positive number is returned, Curator will check for session expiration + * as follows: when ZooKeeper sends a Disconnect event, Curator will start a timer. + * If re-connection is not achieved before the elapsed time exceeds the negotiated + * session time multiplied by the session expiration percent, Curator will simulate + * a session expiration. Due to timing/network issues, it is not possible for + * a client to match the server's session timeout with complete accuracy. Thus, the need + * for a session expiration percentage. + *

+ * + * @return a percentage from 0 to 100 (0 implied no extra session checking) */ - boolean isEmulatingClassicHandling(); + int getSimulatedSessionExpirationPercent(); /** * Called by {@link RetryLoop#callWithRetry(CuratorZookeeperClient, Callable)} to do the work http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java index ffbc4cb..9f311de 100644 --- a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java +++ b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java @@ -18,6 +18,7 @@ */ package org.apache.curator.connection; +import com.google.common.base.Preconditions; import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.RetryLoop; import org.slf4j.Logger; @@ -32,11 +33,23 @@ import java.util.concurrent.Callable; public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy { private final Logger log = LoggerFactory.getLogger(getClass()); + private final int expirationPercent; + + public StandardConnectionHandlingPolicy() + { + this(100); + } + + public StandardConnectionHandlingPolicy(int expirationPercent) + { + Preconditions.checkArgument((expirationPercent > 0) && (expirationPercent <= 100), "expirationPercent must be > 0 and <= 100"); + this.expirationPercent = expirationPercent; + } @Override - public boolean isEmulatingClassicHandling() + public int getSimulatedSessionExpirationPercent() { - return false; + return expirationPercent; } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java index e2f3c11..63ba665 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java @@ -33,12 +33,6 @@ class ClassicInternalConnectionHandler implements InternalConnectionHandler } @Override - public boolean checkSessionExpirationEnabled() - { - return false; - } - - @Override public void suspendConnection(CuratorFrameworkImpl client) { if ( client.setToSuspended() ) http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 848ccf2..da9067d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -38,9 +38,9 @@ import org.apache.curator.framework.api.transaction.TransactionOp; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateManager; -import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.EnsurePath; import org.apache.curator.utils.ThreadUtils; @@ -128,14 +128,15 @@ public class CuratorFrameworkImpl implements CuratorFramework builder.getConnectionHandlingPolicy() ); - internalConnectionHandler = builder.getConnectionHandlingPolicy().isEmulatingClassicHandling() ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler(); + boolean isClassic = (builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent() == 0); + internalConnectionHandler = isClassic ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler(); listeners = new ListenerContainer(); unhandledErrorListeners = new ListenerContainer(); backgroundOperations = new DelayQueue>(); namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); - connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), internalConnectionHandler.checkSessionExpirationEnabled()); + connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference(CuratorFrameworkState.LATENT); http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java index 978dced..65669c3 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java @@ -23,6 +23,4 @@ interface InternalConnectionHandler void checkNewConnection(CuratorFrameworkImpl client); void suspendConnection(CuratorFrameworkImpl client); - - boolean checkSessionExpirationEnabled(); } http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java index f600ad0..be0c726 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java @@ -27,12 +27,6 @@ class StandardInternalConnectionHandler implements InternalConnectionHandler } @Override - public boolean checkSessionExpirationEnabled() - { - return true; - } - - @Override public void checkNewConnection(CuratorFrameworkImpl client) { client.checkInstanceIndex(); http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 3d44d45..b6f2e02 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -66,7 +66,7 @@ public class ConnectionStateManager implements Closeable private final BlockingQueue eventQueue = new ArrayBlockingQueue(QUEUE_SIZE); private final CuratorFramework client; private final int sessionTimeoutMs; - private final boolean checkSessionExpiration; + private final int sessionExpirationPercent; private final ListenerContainer listeners = new ListenerContainer(); private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); private final ExecutorService service; @@ -88,13 +88,13 @@ public class ConnectionStateManager implements Closeable * @param client the client * @param threadFactory thread factory to use or null for a default * @param sessionTimeoutMs the ZK session timeout in milliseconds - * @param checkSessionExpiration if true, check for session timeouts, etc. ala new connection handling method + * @param sessionExpirationPercent percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all */ - public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, boolean checkSessionExpiration) + public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent) { this.client = client; this.sessionTimeoutMs = sessionTimeoutMs; - this.checkSessionExpiration = checkSessionExpiration; + this.sessionExpirationPercent = sessionExpirationPercent; if ( threadFactory == null ) { threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager"); @@ -274,7 +274,7 @@ public class ConnectionStateManager implements Closeable } ); } - else if ( checkSessionExpiration ) + else if ( sessionExpirationPercent > 0 ) { synchronized(this) { @@ -296,9 +296,10 @@ public class ConnectionStateManager implements Closeable long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch; int lastNegotiatedSessionTimeoutMs = client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs(); int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs : sessionTimeoutMs; + useSessionTimeoutMs = (useSessionTimeoutMs * sessionExpirationPercent) / 100; if ( elapsedMs >= useSessionTimeoutMs ) { - log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Session Timeout ms: %d", elapsedMs, useSessionTimeoutMs)); + log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs)); try { // LOL - this method was proposed by me (JZ) in 2013 for totally unrelated reasons