Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 412FA11A78 for ; Tue, 17 Jun 2014 23:03:07 +0000 (UTC) Received: (qmail 61214 invoked by uid 500); 17 Jun 2014 23:03:06 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 61120 invoked by uid 500); 17 Jun 2014 23:03:06 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 60969 invoked by uid 99); 17 Jun 2014 23:03:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 23:03:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F0954981A6C; Tue, 17 Jun 2014 23:03:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Tue, 17 Jun 2014 23:03:13 -0000 Message-Id: <05ecca29d07f4940ba99a8f33d17d24d@git.apache.org> In-Reply-To: <3c4b24e290f6406fa2e9272ee145c47e@git.apache.org> References: <3c4b24e290f6406fa2e9272ee145c47e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/18] git commit: CURATOR-110 - Moved the 'wait until connection established' logic into the ExecuteAfterConnectionEstablished utility class. Cleaned up the blockUntilConnected() logic in the CuratorFrameworkImpl CURATOR-110 - Moved the 'wait until connection established' logic into the ExecuteAfterConnectionEstablished utility class. Cleaned up the blockUntilConnected() logic in the CuratorFrameworkImpl Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e8138ed9 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e8138ed9 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e8138ed9 Branch: refs/heads/master Commit: e8138ed9768e99e98e308845626e36aa55afadb0 Parents: 63d0401 Author: Cameron McKenzie Authored: Mon Jun 16 15:59:56 2014 +1000 Committer: Cameron McKenzie Committed: Mon Jun 16 15:59:56 2014 +1000 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 9 +--- .../framework/imps/CuratorFrameworkImpl.java | 22 +++----- .../framework/state/ConnectionStateManager.java | 5 ++ .../ExecuteAfterConnectionEstablished.java | 53 ++++++++++++++++++++ .../framework/recipes/leader/LeaderLatch.java | 44 ++++++---------- 5 files changed, 81 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 1df3fa5..13cff30 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -213,14 +213,7 @@ public interface CuratorFramework extends Closeable * @param watcher the watcher */ public void clearWatcherReferences(Watcher watcher); - - /** - * Get the current connection state. The connection state will have a value of 0 until - * the first connection related event is received. - * @return The current connection state, or null if it is unknown - */ - public ConnectionState getCurrentConnectionState(); - + /** * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 33e260d..d1de29f 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -67,7 +67,6 @@ public class CuratorFrameworkImpl implements CuratorFramework private final BlockingQueue> backgroundOperations; private final NamespaceImpl namespace; private final ConnectionStateManager connectionStateManager; - private final AtomicReference connectionState; private final AtomicReference authInfo = new AtomicReference(); private final byte[] defaultData; private final FailedDeleteManager failedDeleteManager; @@ -75,6 +74,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final ACLProvider aclProvider; private final NamespaceFacadeCache namespaceFacadeCache; private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); + private final Object connectionLock = new Object(); private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -151,7 +151,6 @@ public class CuratorFrameworkImpl implements CuratorFramework namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory()); - connectionState = new AtomicReference(null); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference(CuratorFrameworkState.LATENT); @@ -174,10 +173,9 @@ public class CuratorFrameworkImpl implements CuratorFramework @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - connectionState.set(newState); - synchronized(connectionState) + synchronized(connectionLock) { - connectionState.notifyAll(); + connectionLock.notifyAll(); } } }); @@ -220,7 +218,6 @@ public class CuratorFrameworkImpl implements CuratorFramework threadFactory = parent.threadFactory; backgroundOperations = parent.backgroundOperations; connectionStateManager = parent.connectionStateManager; - connectionState = parent.connectionState; defaultData = parent.defaultData; failedDeleteManager = parent.failedDeleteManager; compressionProvider = parent.compressionProvider; @@ -890,16 +887,11 @@ public class CuratorFrameworkImpl implements CuratorFramework ); } - public ConnectionState getCurrentConnectionState() - { - return connectionState.get(); - } - @Override public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException { //Check if we're already connected - ConnectionState currentConnectionState = connectionState.get(); + ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState(); if(currentConnectionState != null && currentConnectionState.isConnected()) { return true; @@ -910,9 +902,9 @@ public class CuratorFrameworkImpl implements CuratorFramework for(;;) { - synchronized(connectionState) + synchronized(connectionLock) { - currentConnectionState = connectionState.get(); + currentConnectionState = connectionStateManager.getCurrentConnectionState(); if(currentConnectionState != null && currentConnectionState.isConnected()) { return true; @@ -930,7 +922,7 @@ public class CuratorFrameworkImpl implements CuratorFramework } } - connectionState.wait(waitTime); + connectionLock.wait(waitTime); } } } http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 42804b8..ba29994 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -188,6 +188,11 @@ public class ConnectionStateManager implements Closeable return true; } + + public synchronized ConnectionState getCurrentConnectionState() + { + return currentConnectionState; + } private void postState(ConnectionState state) { http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java new file mode 100644 index 0000000..d213d37 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java @@ -0,0 +1,53 @@ +package org.apache.curator.framework.recipes; + +import java.util.concurrent.ExecutorService; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to allow execution of logic once a ZooKeeper connection becomes available. + * + */ +public class ExecuteAfterConnectionEstablished +{ + private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class); + + /** + * Spawns a new new background thread that will block until a connection is available and + * then execute the 'runAfterConnection' logic + * @param name The name of the spawned thread + * @param client The curator client + * @param runAfterConnection The logic to run + */ + public static void executeAfterConnectionEstablishedInBackground(String name, + final CuratorFramework client, + final Runnable runAfterConnection) + { + //Block until connected + final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(name); + executor.submit(new Runnable() + { + + @Override + public void run() + { + try + { + client.blockUntilConnected(); + runAfterConnection.run(); + } + catch(Exception e) + { + log.error("An error occurred blocking until a connection is available", e); + } + finally + { + executor.shutdown(); + } + } + }); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 21e8cca..13a9f21 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -27,6 +27,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished; import org.apache.curator.framework.recipes.locks.LockInternals; import org.apache.curator.framework.recipes.locks.LockInternalsSorter; import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver; @@ -148,14 +149,6 @@ public class LeaderLatch implements Closeable this.id = Preconditions.checkNotNull(id, "id cannot be null"); this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); } - - private CountDownLatch startLatch; - - public LeaderLatch(CuratorFramework client, String latchPath, - CountDownLatch startLatch) { - this(client, latchPath); - this.startLatch = startLatch; - } /** * Add this instance to the leadership election and attempt to acquire leadership. @@ -166,30 +159,23 @@ public class LeaderLatch implements Closeable { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - //Block until connected - final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(""); - executor.submit(new Runnable() - { - - @Override + ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground(LeaderLatch.class.getName(), + client, new Runnable() + { + @Override public void run() { - try - { - client.blockUntilConnected(); - - client.getConnectionStateListenable().addListener(listener); - reset(); - } - catch(Exception ex) - { - log.error("An error occurred checking resetting leadership.", ex); - } finally { - //Shutdown the executor - executor.shutdown(); - } + try + { + client.getConnectionStateListenable().addListener(listener); + reset(); + } + catch(Exception ex) + { + log.error("An error occurred checking resetting leadership.", ex); + } } - }); + }); } /**