Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B15218CDE for ; Tue, 1 Sep 2015 13:02:26 +0000 (UTC) Received: (qmail 60149 invoked by uid 500); 1 Sep 2015 13:02:26 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 60064 invoked by uid 500); 1 Sep 2015 13:02:26 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 59855 invoked by uid 99); 1 Sep 2015 13:02:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Sep 2015 13:02:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16019DFF82; Tue, 1 Sep 2015 13:02:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Tue, 01 Sep 2015 13:02:35 -0000 Message-Id: In-Reply-To: <881087f9832f4ae08cb5006b17f6dd3f@git.apache.org> References: <881087f9832f4ae08cb5006b17f6dd3f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/29] curator git commit: further refactoring. Abstracted old framework-level connection handling into ClassicInternalConnectionHandler. Probably more to do here further refactoring. Abstracted old framework-level connection handling into ClassicInternalConnectionHandler. Probably more to do here Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/30bd7b65 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/30bd7b65 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/30bd7b65 Branch: refs/heads/CURATOR-3.0 Commit: 30bd7b655d201762d8ff74062964621879ac7134 Parents: e239137 Author: randgalt Authored: Sat Aug 22 19:29:36 2015 -0500 Committer: randgalt Committed: Sat Aug 22 19:29:36 2015 -0500 ---------------------------------------------------------------------- .../imps/ClassicInternalConnectionHandler.java | 58 ++++++++++++++++++ .../framework/imps/CuratorFrameworkImpl.java | 64 ++++++-------------- .../imps/InternalConnectionHandler.java | 10 +++ .../imps/StandardInternalConnectionHandler.java | 22 +++++++ .../framework/state/ConnectionStateManager.java | 8 ++- 5 files changed, 112 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java new file mode 100644 index 0000000..1de6e80 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java @@ -0,0 +1,58 @@ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.state.ConnectionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ClassicInternalConnectionHandler implements InternalConnectionHandler +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Override + public void checkNewConnection(CuratorFrameworkImpl client) + { + // NOP + } + + @Override + public boolean checkSessionExpirationEnabled() + { + return false; + } + + @Override + public void suspendConnection(CuratorFrameworkImpl client) + { + if ( client.setToSuspended() ) + { + doSyncForSuspendedConnection(client, client.getZookeeperClient().getInstanceIndex()); + } + } + + private void doSyncForSuspendedConnection(final CuratorFrameworkImpl client, final long instanceIndex) + { + // we appear to have disconnected, force a new ZK event and see if we can connect to another server + final BackgroundOperation operation = new BackgroundSyncImpl(client, null); + OperationAndData.ErrorCallback errorCallback = new OperationAndData.ErrorCallback() + { + @Override + public void retriesExhausted(OperationAndData operationAndData) + { + // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated + // so the pending background sync is no longer valid. + // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost + if ( (instanceIndex < 0) || (instanceIndex == client.getZookeeperClient().getInstanceIndex()) ) + { + client.addStateChange(ConnectionState.LOST); + } + else + { + log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying."); + // send -1 to signal that if it happens again, punt and mark the connection lost + doSyncForSuspendedConnection(client, -1); + } + } + }; + client.performBackgroundOperation(new OperationAndData(operation, "/", null, errorCallback, null)); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 44a8ec6..b04987d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -85,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); private final boolean useContainerParentsIfAvailable; private final AtomicLong currentInstanceIndex = new AtomicLong(-1); + private final InternalConnectionHandler internalConnectionHandler; private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -125,13 +126,14 @@ public class CuratorFrameworkImpl implements CuratorFramework builder.getConnectionHandlingPolicy() ); + internalConnectionHandler = builder.getConnectionHandlingPolicy().isEmulatingClassicHandling() ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler(); listeners = new ListenerContainer(); unhandledErrorListeners = new ListenerContainer(); backgroundOperations = new DelayQueue>(); namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); - connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs()); + connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), internalConnectionHandler.checkSessionExpirationEnabled()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference(CuratorFrameworkState.LATENT); @@ -209,6 +211,7 @@ public class CuratorFrameworkImpl implements CuratorFramework state = parent.state; authInfos = parent.authInfos; useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable; + internalConnectionHandler = parent.internalConnectionHandler; } @Override @@ -676,7 +679,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { if ( state == Watcher.Event.KeeperState.Disconnected ) { - suspendConnection(); + internalConnectionHandler.suspendConnection(this); } else if ( state == Watcher.Event.KeeperState.Expired ) { @@ -684,26 +687,23 @@ public class CuratorFrameworkImpl implements CuratorFramework } else if ( state == Watcher.Event.KeeperState.SyncConnected ) { - checkNewConnection(); + internalConnectionHandler.checkNewConnection(this); connectionStateManager.addStateChange(ConnectionState.RECONNECTED); } else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) { - checkNewConnection(); + internalConnectionHandler.checkNewConnection(this); connectionStateManager.addStateChange(ConnectionState.READ_ONLY); } } - private void checkNewConnection() + void checkInstanceIndex() { - if ( !client.getConnectionHandlingPolicy().isEmulatingClassicHandling() ) + long instanceIndex = client.getInstanceIndex(); + long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex); + if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this { - long instanceIndex = client.getInstanceIndex(); - long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex); - if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this - { - connectionStateManager.addStateChange(ConnectionState.LOST); - } + connectionStateManager.addStateChange(ConnectionState.LOST); } } @@ -742,44 +742,14 @@ public class CuratorFrameworkImpl implements CuratorFramework return null; } - private void suspendConnection() + boolean setToSuspended() { - if ( !connectionStateManager.setToSuspended() ) - { - return; - } - - if ( client.getConnectionHandlingPolicy().isEmulatingClassicHandling() ) - { - doSyncForSuspendedConnection(client.getInstanceIndex()); - } + return connectionStateManager.setToSuspended(); } - private void doSyncForSuspendedConnection(final long instanceIndex) + void addStateChange(ConnectionState newConnectionState) { - // we appear to have disconnected, force a new ZK event and see if we can connect to another server - final BackgroundOperation operation = new BackgroundSyncImpl(this, null); - OperationAndData.ErrorCallback errorCallback = new OperationAndData.ErrorCallback() - { - @Override - public void retriesExhausted(OperationAndData operationAndData) - { - // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated - // so the pending background sync is no longer valid. - // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost - if ( (instanceIndex < 0) || (instanceIndex == client.getInstanceIndex()) ) - { - connectionStateManager.addStateChange(ConnectionState.LOST); - } - else - { - log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying."); - // send -1 to signal that if it happens again, punt and mark the connection lost - doSyncForSuspendedConnection(-1); - } - } - }; - performBackgroundOperation(new OperationAndData(operation, "/", null, errorCallback, null)); + connectionStateManager.addStateChange(newConnectionState); } @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"}) @@ -894,7 +864,7 @@ public class CuratorFrameworkImpl implements CuratorFramework } } - private void performBackgroundOperation(OperationAndData operationAndData) + void performBackgroundOperation(OperationAndData operationAndData) { try { http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java new file mode 100644 index 0000000..e9798d7 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java @@ -0,0 +1,10 @@ +package org.apache.curator.framework.imps; + +interface InternalConnectionHandler +{ + void checkNewConnection(CuratorFrameworkImpl client); + + void suspendConnection(CuratorFrameworkImpl client); + + boolean checkSessionExpirationEnabled(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java new file mode 100644 index 0000000..b0452c6 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java @@ -0,0 +1,22 @@ +package org.apache.curator.framework.imps; + +class StandardInternalConnectionHandler implements InternalConnectionHandler +{ + @Override + public void suspendConnection(CuratorFrameworkImpl client) + { + client.setToSuspended(); + } + + @Override + public boolean checkSessionExpirationEnabled() + { + return true; + } + + @Override + public void checkNewConnection(CuratorFrameworkImpl client) + { + client.checkInstanceIndex(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 2e7492f..406099d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -21,7 +21,6 @@ package org.apache.curator.framework.state; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import org.apache.curator.connection.ConnectionHandlingPolicyStyle; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.utils.ThreadUtils; @@ -67,6 +66,7 @@ public class ConnectionStateManager implements Closeable private final BlockingQueue eventQueue = new ArrayBlockingQueue(QUEUE_SIZE); private final CuratorFramework client; private final int sessionTimeoutMs; + private final boolean checkSessionExpiration; private final ListenerContainer listeners = new ListenerContainer(); private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); private final ExecutorService service; @@ -88,11 +88,13 @@ public class ConnectionStateManager implements Closeable * @param client the client * @param threadFactory thread factory to use or null for a default * @param sessionTimeoutMs the ZK session timeout in milliseconds + * @param checkSessionExpiration if true, check for session timeouts, etc. ala new connection handling method */ - public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs) + public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, boolean checkSessionExpiration) { this.client = client; this.sessionTimeoutMs = sessionTimeoutMs; + this.checkSessionExpiration = checkSessionExpiration; if ( threadFactory == null ) { threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager"); @@ -270,7 +272,7 @@ public class ConnectionStateManager implements Closeable } ); } - else if ( !client.getZookeeperClient().getConnectionHandlingPolicy().isEmulatingClassicHandling() ) + else if ( checkSessionExpiration ) { synchronized(this) {