Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5C7FC11A7C for ; Tue, 17 Jun 2014 23:03:07 +0000 (UTC) Received: (qmail 61454 invoked by uid 500); 17 Jun 2014 23:03:06 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 61360 invoked by uid 500); 17 Jun 2014 23:03:06 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 61033 invoked by uid 99); 17 Jun 2014 23:03:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 23:03:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 18572981A74; Tue, 17 Jun 2014 23:03:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Tue, 17 Jun 2014 23:03:17 -0000 Message-Id: <9b220778f23247bd8539cf06ecb70d58@git.apache.org> In-Reply-To: <3c4b24e290f6406fa2e9272ee145c47e@git.apache.org> References: <3c4b24e290f6406fa2e9272ee145c47e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/18] git commit: Moved the connection blocking code into ConnectionManager. It's cleaner and doesn't require a connection state listener Moved the connection blocking code into ConnectionManager. It's cleaner and doesn't require a connection state listener Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/04cefb47 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/04cefb47 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/04cefb47 Branch: refs/heads/master Commit: 04cefb47f18c9d4bd3a0eb897563dd5abb7c89c8 Parents: 1c94c7e Author: randgalt Authored: Mon Jun 16 14:35:54 2014 -0500 Committer: randgalt Committed: Mon Jun 16 14:35:54 2014 -0500 ---------------------------------------------------------------------- .../framework/imps/CuratorFrameworkImpl.java | 289 +++++++----------- .../framework/state/ConnectionStateManager.java | 38 ++- .../framework/imps/TestBlockUntilConnected.java | 304 +++++++++---------- .../recipes/AfterConnectionEstablished.java | 14 +- .../framework/recipes/leader/LeaderLatch.java | 21 +- .../recipes/leader/TestLeaderLatch.java | 46 +-- 6 files changed, 336 insertions(+), 376 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 14473d8..23a3248 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -16,11 +16,11 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import com.google.common.base.Function; import com.google.common.base.Preconditions; - import org.apache.curator.CuratorConnectionLossException; import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.RetryLoop; @@ -44,7 +44,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.Arrays; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -59,40 +58,40 @@ import java.util.concurrent.atomic.AtomicReference; public class CuratorFrameworkImpl implements CuratorFramework { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final CuratorZookeeperClient client; - private final ListenerContainer listeners; - private final ListenerContainer unhandledErrorListeners; - private final ThreadFactory threadFactory; - private final BlockingQueue> backgroundOperations; - private final NamespaceImpl namespace; - private final ConnectionStateManager connectionStateManager; - private final AtomicReference authInfo = new AtomicReference(); - private final byte[] defaultData; - private final FailedDeleteManager failedDeleteManager; - private final CompressionProvider compressionProvider; - private final ACLProvider aclProvider; - private final NamespaceFacadeCache namespaceFacadeCache; - private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); - private final Object connectionLock = new Object(); - - private volatile ExecutorService executorService; - private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); - - private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL); + private final Logger log = LoggerFactory.getLogger(getClass()); + private final CuratorZookeeperClient client; + private final ListenerContainer listeners; + private final ListenerContainer unhandledErrorListeners; + private final ThreadFactory threadFactory; + private final BlockingQueue> backgroundOperations; + private final NamespaceImpl namespace; + private final ConnectionStateManager connectionStateManager; + private final AtomicReference authInfo = new AtomicReference(); + private final byte[] defaultData; + private final FailedDeleteManager failedDeleteManager; + private final CompressionProvider compressionProvider; + private final ACLProvider aclProvider; + private final NamespaceFacadeCache namespaceFacadeCache; + private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); + + private volatile ExecutorService executorService; + private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); + + private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL); interface DebugBackgroundListener { - void listen(OperationAndData data); + void listen(OperationAndData data); } - volatile DebugBackgroundListener debugListener = null; - private final AtomicReference state; + volatile DebugBackgroundListener debugListener = null; + + private final AtomicReference state; private static class AuthInfo { - final String scheme; - final byte[] auth; + final String scheme; + final byte[] auth; private AuthInfo(String scheme, byte[] auth) { @@ -113,37 +112,15 @@ public class CuratorFrameworkImpl implements CuratorFramework public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) { ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory()); - this.client = new CuratorZookeeperClient - ( - localZookeeperFactory, - builder.getEnsembleProvider(), - builder.getSessionTimeoutMs(), - builder.getConnectionTimeoutMs(), - new Watcher() + this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher() + { + @Override + public void process(WatchedEvent watchedEvent) { - @Override - public void process(WatchedEvent watchedEvent) - { - CuratorEvent event = new CuratorEventImpl - ( - CuratorFrameworkImpl.this, - CuratorEventType.WATCHED, - watchedEvent.getState().getIntValue(), - unfixForNamespace(watchedEvent.getPath()), - null, - null, - null, - null, - null, - watchedEvent, - null - ); - processEvent(event); - } - }, - builder.getRetryPolicy(), - builder.canBeReadOnly() - ); + CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null); + processEvent(event); + } + }, builder.getRetryPolicy(), builder.canBeReadOnly()); listeners = new ListenerContainer(); unhandledErrorListeners = new ListenerContainer(); @@ -155,7 +132,7 @@ public class CuratorFrameworkImpl implements CuratorFramework aclProvider = builder.getAclProvider(); state = new AtomicReference(CuratorFrameworkState.LATENT); - byte[] builderDefaultData = builder.getDefaultData(); + byte[] builderDefaultData = builder.getDefaultData(); defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0]; if ( builder.getAuthScheme() != null ) @@ -165,23 +142,6 @@ public class CuratorFrameworkImpl implements CuratorFramework failedDeleteManager = new FailedDeleteManager(this); namespaceFacadeCache = new NamespaceFacadeCache(this); - - //Add callback handler to determine connection state transitions - getConnectionStateListenable().addListener(new ConnectionStateListener() - { - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if(newState.isConnected()) - { - synchronized(connectionLock) - { - connectionLock.notifyAll(); - } - } - } - }); } private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory) @@ -253,7 +213,19 @@ public class CuratorFrameworkImpl implements CuratorFramework } @Override - public void start() + public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException + { + return connectionStateManager.blockUntilConnected(maxWaitTime, units); + } + + @Override + public void blockUntilConnected() throws InterruptedException + { + blockUntilConnected(0, null); + } + + @Override + public void start() { log.info("Starting"); if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ) @@ -266,7 +238,7 @@ public class CuratorFrameworkImpl implements CuratorFramework try { connectionStateManager.start(); // ordering dependency - must be called before client.start() - + final ConnectionStateListener listener = new ConnectionStateListener() { @Override @@ -278,16 +250,14 @@ public class CuratorFrameworkImpl implements CuratorFramework } } }; - + this.getConnectionStateListenable().addListener(listener); client.start(); executorService = Executors.newFixedThreadPool(2, threadFactory); // 1 for listeners, 1 for background ops - executorService.submit - ( - new Callable() + executorService.submit(new Callable() { @Override public Object call() throws Exception @@ -295,8 +265,7 @@ public class CuratorFrameworkImpl implements CuratorFramework backgroundOperationsLoop(); return null; } - } - ); + }); } catch ( Exception e ) { @@ -305,31 +274,28 @@ public class CuratorFrameworkImpl implements CuratorFramework } @Override - public void close() + public void close() { log.debug("Closing"); if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) ) { - listeners.forEach - ( - new Function() + listeners.forEach(new Function() + { + @Override + public Void apply(CuratorListener listener) { - @Override - public Void apply(CuratorListener listener) + CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null); + try + { + listener.eventReceived(CuratorFrameworkImpl.this, event); + } + catch ( Exception e ) { - CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null); - try - { - listener.eventReceived(CuratorFrameworkImpl.this, event); - } - catch ( Exception e ) - { - log.error("Exception while sending Closing event", e); - } - return null; + log.error("Exception while sending Closing event", e); } + return null; } - ); + }); listeners.clear(); unhandledErrorListeners.clear(); @@ -515,14 +481,14 @@ public class CuratorFrameworkImpl implements CuratorFramework void processBackgroundOperation(OperationAndData operationAndData, CuratorEvent event) { - boolean isInitialExecution = (event == null); + boolean isInitialExecution = (event == null); if ( isInitialExecution ) { performBackgroundOperation(operationAndData); return; } - boolean doQueueOperation = false; + boolean doQueueOperation = false; do { if ( RetryLoop.shouldRetry(event.getResultCode()) ) @@ -538,7 +504,8 @@ public class CuratorFrameworkImpl implements CuratorFramework } processEvent(event); - } while ( false ); + } + while ( false ); if ( doQueueOperation ) { @@ -560,7 +527,8 @@ public class CuratorFrameworkImpl implements CuratorFramework if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof KeeperException) ) { - if ( e instanceof KeeperException.ConnectionLossException || e instanceof CuratorConnectionLossException ) { + if ( e instanceof KeeperException.ConnectionLossException ) + { if ( LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true, false) ) { log.error(reason, e); @@ -576,27 +544,24 @@ public class CuratorFrameworkImpl implements CuratorFramework } } - final String localReason = reason; - unhandledErrorListeners.forEach - ( - new Function() + final String localReason = reason; + unhandledErrorListeners.forEach(new Function() + { + @Override + public Void apply(UnhandledErrorListener listener) { - @Override - public Void apply(UnhandledErrorListener listener) - { - listener.unhandledError(localReason, e); - return null; - } + listener.unhandledError(localReason, e); + return null; } - ); + }); } - String unfixForNamespace(String path) + String unfixForNamespace(String path) { return namespace.unfixForNamespace(path); } - String fixForNamespace(String path) + String fixForNamespace(String path) { return namespace.fixForNamespace(path); } @@ -723,8 +688,8 @@ public class CuratorFrameworkImpl implements CuratorFramework sendToBackgroundCallback(operationAndData, event); } - KeeperException.Code code = KeeperException.Code.get(event.getResultCode()); - Exception e = null; + KeeperException.Code code = KeeperException.Code.get(event.getResultCode()); + Exception e = null; try { e = (code != null) ? KeeperException.create(code) : null; @@ -755,7 +720,7 @@ public class CuratorFrameworkImpl implements CuratorFramework } } - private void handleBackgroundOperationException(OperationAndData operationAndData, Throwable e) + private void handleBackgroundOperationException(OperationAndData operationAndData, Throwable e) { do { @@ -788,14 +753,15 @@ public class CuratorFrameworkImpl implements CuratorFramework } logError("Background exception was not retry-able or retry gave up", e); - } while ( false ); + } + while ( false ); } private void backgroundOperationsLoop() { while ( !Thread.interrupted() ) { - OperationAndData operationAndData; + OperationAndData operationAndData; try { operationAndData = backgroundOperations.take(); @@ -850,7 +816,7 @@ public class CuratorFrameworkImpl implements CuratorFramework } else { - logError("Background retry gave up", e); + logError("Background retry gave up", e); } } else @@ -867,70 +833,23 @@ public class CuratorFrameworkImpl implements CuratorFramework validateConnection(curatorEvent.getWatchedEvent().getState()); } - listeners.forEach - ( - new Function() + listeners.forEach(new Function() + { + @Override + public Void apply(CuratorListener listener) { - @Override - public Void apply(CuratorListener listener) + try { - try - { - TimeTrace trace = client.startTracer("EventListener"); - listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent); - trace.commit(); - } - catch ( Exception e ) - { - logError("Event listener threw exception", e); - } - return null; + TimeTrace trace = client.startTracer("EventListener"); + listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent); + trace.commit(); } + catch ( Exception e ) + { + logError("Event listener threw exception", e); + } + return null; } - ); - } - - @Override - public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException - { - //Check if we're already connected - ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState(); - if(currentConnectionState != null && currentConnectionState.isConnected()) - { - return true; - } - - long startTime = System.currentTimeMillis(); - long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units); - - for(;;) - { - synchronized(connectionLock) - { - currentConnectionState = connectionStateManager.getCurrentConnectionState(); - if(currentConnectionState != null && currentConnectionState.isConnected()) - { - return true; - } - - long waitTime = 0; - if(maxWaitTime > 0) - { - waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime); - - //Timeout - if(waitTime <= 0) - { - return false; - } - } - - connectionLock.wait(waitTime); - } - } - } - - public void blockUntilConnected() throws InterruptedException { - blockUntilConnected(0, TimeUnit.SECONDS); + }); } } http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index ba29994..fb312dc 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -153,6 +154,7 @@ public class ConnectionStateManager implements Closeable currentConnectionState = ConnectionState.SUSPENDED; postState(ConnectionState.SUSPENDED); + return true; } @@ -188,15 +190,45 @@ public class ConnectionStateManager implements Closeable return true; } - - public synchronized ConnectionState getCurrentConnectionState() + + public synchronized boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException + { + boolean hasMaxWait = (units != null); + long startTime = System.currentTimeMillis(); + + while ( !isConnected() ) + { + long maxWaitTimeMS = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime, units) : 0; + + if ( hasMaxWait ) + { + long waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime); + if ( waitTime <= 0 ) + { + return isConnected(); + } + + wait(waitTime); + } + else + { + wait(); + } + } + return isConnected(); + } + + public synchronized boolean isConnected() { - return currentConnectionState; + return (currentConnectionState != null) && currentConnectionState.isConnected(); } private void postState(ConnectionState state) { log.info("State change: " + state); + + notifyAll(); + while ( !eventQueue.offer(state) ) { eventQueue.poll(); http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java index 8dfb7d8..f649afb 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java @@ -16,12 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.framework.imps; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +package org.apache.curator.framework.imps; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -34,202 +30,206 @@ import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class TestBlockUntilConnected extends BaseClassForTests { - /** - * Test the case where we're already connected - */ - @Test - public void testBlockUntilConnectedCurrentlyConnected() - { - Timing timing = new Timing(); + /** + * Test the case where we're already connected + */ + @Test + public void testBlockUntilConnectedCurrentlyConnected() throws Exception + { + Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); - + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try { - final CountDownLatch connectedLatch = new CountDownLatch(1); - client.getConnectionStateListenable().addListener(new ConnectionStateListener() - { - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if(newState.isConnected()) - { - connectedLatch.countDown(); - } - } - }); - - client.start(); - - Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch"); - Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected"); + final CountDownLatch connectedLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener(new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState.isConnected() ) + { + connectedLatch.countDown(); + } + } + }); + + client.start(); + + Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch"); + Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected"); } - catch(InterruptedException e) + catch ( InterruptedException e ) { - Assert.fail("Unexpected interruption"); + Assert.fail("Unexpected interruption"); } finally { - CloseableUtils.closeQuietly(client); + CloseableUtils.closeQuietly(client); } - } - - /** - * Test the case where we are not currently connected and never have been - */ - @Test - public void testBlockUntilConnectedCurrentlyNeverConnected() - { + } + + /** + * Test the case where we are not currently connected and never have been + */ + @Test + public void testBlockUntilConnectedCurrentlyNeverConnected() + { CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); - + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try { - client.start(); - Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected"); + client.start(); + Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected"); } - catch(InterruptedException e) + catch ( InterruptedException e ) { - Assert.fail("Unexpected interruption"); + Assert.fail("Unexpected interruption"); } finally { - CloseableUtils.closeQuietly(client); + CloseableUtils.closeQuietly(client); } - } - - /** - * Test the case where we are not currently connected, but have been previously - */ - @Test - public void testBlockUntilConnectedCurrentlyAwaitingReconnect() - { - Timing timing = new Timing(); + } + + /** + * Test the case where we are not currently connected, but have been previously + */ + @Test + public void testBlockUntilConnectedCurrentlyAwaitingReconnect() + { + Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); - + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + final CountDownLatch lostLatch = new CountDownLatch(1); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if(newState == ConnectionState.LOST) - { - lostLatch.countDown(); - } - } - }); - + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + } + }); + try { - client.start(); - - //Block until we're connected - Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect"); - - //Kill the server - CloseableUtils.closeQuietly(server); - - //Wait until we hit the lost state - Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state"); - - server = new TestingServer(server.getPort(), server.getTempDirectory()); - - Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected"); + client.start(); + + //Block until we're connected + Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect"); + + //Kill the server + CloseableUtils.closeQuietly(server); + + //Wait until we hit the lost state + Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state"); + + server = new TestingServer(server.getPort(), server.getTempDirectory()); + + Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected"); } - catch(Exception e) + catch ( Exception e ) { - Assert.fail("Unexpected exception " + e); + Assert.fail("Unexpected exception " + e); } finally { - CloseableUtils.closeQuietly(client); + CloseableUtils.closeQuietly(client); } - } - - /** - * Test the case where we are not currently connected and time out before a - * connection becomes available. - */ - @Test - public void testBlockUntilConnectedConnectTimeout() - { - //Kill the server - CloseableUtils.closeQuietly(server); - + } + + /** + * Test the case where we are not currently connected and time out before a + * connection becomes available. + */ + @Test + public void testBlockUntilConnectedConnectTimeout() + { + //Kill the server + CloseableUtils.closeQuietly(server); + CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); - + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try { - client.start(); - Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS), - "Connected"); + client.start(); + Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS), "Connected"); } - catch(InterruptedException e) + catch ( InterruptedException e ) { - Assert.fail("Unexpected interruption"); + Assert.fail("Unexpected interruption"); } finally { - CloseableUtils.closeQuietly(client); + CloseableUtils.closeQuietly(client); } - } - - /** - * Test the case where we are not currently connected and the thread gets interrupted - * prior to a connection becoming available - */ - @Test - public void testBlockUntilConnectedInterrupt() - { - //Kill the server - CloseableUtils.closeQuietly(server); - + } + + /** + * Test the case where we are not currently connected and the thread gets interrupted + * prior to a connection becoming available + */ + @Test + public void testBlockUntilConnectedInterrupt() + { + //Kill the server + CloseableUtils.closeQuietly(server); + final CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); - + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try { - client.start(); - - final Thread threadToInterrupt = Thread.currentThread(); - - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - - @Override - public void run() { - threadToInterrupt.interrupt(); - } - }, 3000); - - client.blockUntilConnected(5, TimeUnit.SECONDS); - Assert.fail("Expected interruption did not occur"); + client.start(); + + final Thread threadToInterrupt = Thread.currentThread(); + + Timer timer = new Timer(); + timer.schedule(new TimerTask() + { + + @Override + public void run() + { + threadToInterrupt.interrupt(); + } + }, 3000); + + client.blockUntilConnected(5, TimeUnit.SECONDS); + Assert.fail("Expected interruption did not occur"); } - catch(InterruptedException e) + catch ( InterruptedException e ) { - //This is expected + //This is expected } finally { - CloseableUtils.closeQuietly(client); + CloseableUtils.closeQuietly(client); } - } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java index f37f7c0..41ba702 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java @@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; /** @@ -39,24 +38,23 @@ public class AfterConnectionEstablished * @param client The curator client * @param runAfterConnection The logic to run */ - public static T execute(final CuratorFramework client, final Callable runAfterConnection) throws Exception + public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception { //Block until connected - final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName()); - Callable internalCall = new Callable() + final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass())); + Runnable internalCall = new Runnable() { @Override - public T call() throws Exception + public void run() { try { client.blockUntilConnected(); - return runAfterConnection.call(); + runAfterConnection.run(); } catch ( Exception e ) { log.error("An error occurred blocking until a connection is available", e); - throw e; } finally { @@ -64,7 +62,7 @@ public class AfterConnectionEstablished } } }; - return executor.submit(internalCall).get(); + executor.submit(internalCall); } private AfterConnectionEstablished() http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index f4c1cef..dce3f5e 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -26,7 +26,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.listen.ListenerContainer; -import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished; +import org.apache.curator.framework.recipes.AfterConnectionEstablished; import org.apache.curator.framework.recipes.locks.LockInternals; import org.apache.curator.framework.recipes.locks.LockInternalsSorter; import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver; @@ -156,17 +156,22 @@ public class LeaderLatch implements Closeable { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground + AfterConnectionEstablished.execute ( - client, - new Callable() + client, new Runnable() { @Override - public Void call() throws Exception + public void run() { client.getConnectionStateListenable().addListener(listener); - reset(); - return null; + try + { + reset(); + } + catch ( Exception e ) + { + log.error("An error occurred checking resetting leadership.", e); + } } } ); @@ -556,7 +561,7 @@ public class LeaderLatch implements Closeable private void handleStateChange(ConnectionState newState) { - if (newState.isConnected()) + if ( newState == ConnectionState.RECONNECTED ) { try { http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 35d8809..b97e708 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -22,7 +22,6 @@ package org.apache.curator.framework.recipes.leader; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; @@ -35,7 +34,6 @@ import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; - import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; @@ -215,6 +213,17 @@ public class TestLeaderLatch extends BaseClassForTests @Test public void testWaiting() throws Exception { + final int LOOPS = 10; + for ( int i = 0; i < LOOPS; ++i ) + { + System.out.println("TRY #" + i); + internalTestWaitingOnce(); + Thread.sleep(10); + } + } + + private void internalTestWaitingOnce() throws Exception + { final int PARTICIPANT_QTY = 10; ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY); @@ -241,10 +250,10 @@ public class TestLeaderLatch extends BaseClassForTests Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS)); Assert.assertTrue(thereIsALeader.compareAndSet(false, true)); Thread.sleep((int)(10 * Math.random())); + thereIsALeader.set(false); } finally { - thereIsALeader.set(false); latch.close(); } return null; @@ -259,7 +268,7 @@ public class TestLeaderLatch extends BaseClassForTests } finally { - executorService.shutdown(); + executorService.shutdownNow(); CloseableUtils.closeQuietly(client); } } @@ -526,23 +535,21 @@ public class TestLeaderLatch extends BaseClassForTests CloseableUtils.closeQuietly(client); } } - + @Test public void testNoServerAtStart() - { + { CloseableUtils.closeQuietly(server); Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.newClient( - server.getConnectString(), timing.session(), - timing.connection(), new RetryNTimes(5, 1000)); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryNTimes(5, 1000)); client.start(); final LeaderLatch leader = new LeaderLatch(client, PATH_NAME); final CountDownLatch leaderCounter = new CountDownLatch(1); final AtomicInteger leaderCount = new AtomicInteger(0); - final AtomicInteger notLeaderCount = new AtomicInteger(0); + final AtomicInteger notLeaderCount = new AtomicInteger(0); leader.addListener(new LeaderLatchListener() { @Override @@ -555,27 +562,26 @@ public class TestLeaderLatch extends BaseClassForTests @Override public void notLeader() { - notLeaderCount.incrementAndGet(); + notLeaderCount.incrementAndGet(); } }); try { - leader.start(); - - //Wait for a while before starting the test server - Thread.sleep(5000); + leader.start(); + + timing.sleepABit(); // Start the new server - server = new TestingServer(server.getPort()); + server = new TestingServer(server.getPort(), server.getTempDirectory()); Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader"); - + Assert.assertEquals(leaderCount.get(), 1, "Elected too many times"); - Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times"); + Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times"); } - catch (Exception e) + catch ( Exception e ) { Assert.fail("Unexpected exception", e); } @@ -585,7 +591,7 @@ public class TestLeaderLatch extends BaseClassForTests CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } - } + } private enum Mode {