Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 06705177E3 for ; Tue, 12 May 2015 13:54:02 +0000 (UTC) Received: (qmail 57482 invoked by uid 500); 12 May 2015 13:54:02 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 57313 invoked by uid 500); 12 May 2015 13:54:02 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 57003 invoked by uid 99); 12 May 2015 13:54:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 May 2015 13:54:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A34EFE2F1B; Tue, 12 May 2015 13:54:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Tue, 12 May 2015 13:54:18 -0000 Message-Id: <37629fee888143278cccb232fa3c5a82@git.apache.org> In-Reply-To: <4f78bbf8221e442385ad54149899906d@git.apache.org> References: <4f78bbf8221e442385ad54149899906d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/25] curator git commit: Reworked the reconfig() APIs to be more like the rest of Curator http://git-wip-us.apache.org/repos/asf/curator/blob/aaa04872/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index 44f9d00..297cf9b 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import org.apache.curator.ensemble.EnsembleListener; @@ -27,6 +28,7 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; +import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -34,7 +36,6 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - import java.io.IOException; import java.io.StringReader; import java.util.HashMap; @@ -45,32 +46,33 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -public class TestReconfiguration { - - TestingCluster cluster; - DynamicEnsembleProvider dynamicEnsembleProvider; - WaitOnDelegateListener waitOnDelegateListener; - EnsembleTracker ensembleTracker; - CuratorFramework client; +public class TestReconfiguration +{ + private TestingCluster cluster; + private DynamicEnsembleProvider dynamicEnsembleProvider; + private WaitOnDelegateListener waitOnDelegateListener; + private EnsembleTracker ensembleTracker; + private CuratorFramework client; - String connectionString1to5; - String connectionString2to5; - String connectionString3to5; + private String connectionString1to5; + private String connectionString2to5; + private String connectionString3to5; @BeforeMethod - public void setup() throws Exception { + public void setup() throws Exception + { cluster = new TestingCluster(5); cluster.start(); connectionString1to5 = cluster.getConnectString(); - connectionString2to5 = getConnectionString(cluster, 2,3,4,5); - connectionString3to5 = getConnectionString(cluster, 3,4,5); + connectionString2to5 = getConnectionString(cluster, 2, 3, 4, 5); + connectionString3to5 = getConnectionString(cluster, 3, 4, 5); dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5); client = CuratorFrameworkFactory.builder() - .ensembleProvider(dynamicEnsembleProvider) - .retryPolicy(new RetryOneTime(1)) - .build(); + .ensembleProvider(dynamicEnsembleProvider) + .retryPolicy(new RetryOneTime(1)) + .build(); client.start(); client.blockUntilConnected(); @@ -84,14 +86,16 @@ public class TestReconfiguration { } @AfterMethod - public void tearDown() throws IOException { - ensembleTracker.close(); - client.close(); - cluster.close(); + public void tearDown() throws IOException + { + CloseableUtils.closeQuietly(ensembleTracker); + CloseableUtils.closeQuietly(client); + CloseableUtils.closeQuietly(cluster); } @Test - public void testSyncIncremental() throws Exception { + public void testSyncIncremental() throws Exception + { Stat stat = new Stat(); byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); Assert.assertNotNull(bytes); @@ -132,15 +136,19 @@ public class TestReconfiguration { } @Test - public void testAsyncIncremental() throws Exception { - final AtomicReference bytes = new AtomicReference(); - final BackgroundCallback callback = new BackgroundCallback() { + public void testAsyncIncremental() throws Exception + { + final AtomicReference bytes = new AtomicReference<>(); + final BackgroundCallback callback = new BackgroundCallback() + { @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { bytes.set(event.getData()); //We only need the latch on getConfig. - if (event.getContext() != null) { - ((CountDownLatch) event.getContext()).countDown(); + if ( event.getContext() != null ) + { + ((CountDownLatch)event.getContext()).countDown(); } } @@ -155,29 +163,27 @@ public class TestReconfiguration { String server1 = getServerString(qv, cluster, 1L); String server2 = getServerString(qv, cluster, 2L); - //Remove Servers - client.reconfig().leaving("1").fromConfig(qv.getVersion()).inBackground(callback).forEnsemble(); + client.reconfig().inBackground(callback).leaving("1").fromConfig(qv.getVersion()).forEnsemble(); waitOnDelegateListener.waitForEvent(); Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); qv = getQuorumVerifier(bytes.get()); Assert.assertEquals(qv.getAllMembers().size(), 4); - client.reconfig().leaving("2").fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble(); + client.reconfig().inBackground(callback, latch).leaving("2").fromConfig(qv.getVersion()).forEnsemble(); waitOnDelegateListener.waitForEvent(); Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); qv = getQuorumVerifier(bytes.get()); Assert.assertEquals(qv.getAllMembers().size(), 3); //Add Servers - client.reconfig().joining("server.2=" + server2).fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble(); + client.reconfig().inBackground(callback, latch).joining("server.2=" + server2).fromConfig(qv.getVersion()).forEnsemble(); waitOnDelegateListener.waitForEvent(); Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); qv = getQuorumVerifier(bytes.get()); Assert.assertEquals(qv.getAllMembers().size(), 4); - - client.reconfig().joining("server.1=" + server1).fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble(); + client.reconfig().inBackground(callback, latch).joining("server.1=" + server1).fromConfig(qv.getVersion()).forEnsemble(); waitOnDelegateListener.waitForEvent(); Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); qv = getQuorumVerifier(bytes.get()); @@ -185,7 +191,8 @@ public class TestReconfiguration { } @Test - public void testSyncNonIncremental() throws Exception { + public void testSyncNonIncremental() throws Exception + { Stat stat = new Stat(); byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); Assert.assertNotNull(bytes); @@ -199,11 +206,11 @@ public class TestReconfiguration { //Remove Servers bytes = client.reconfig() - .withMembers("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); + .adding("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); qv = getQuorumVerifier(bytes); Assert.assertEquals(qv.getAllMembers().size(), 4); @@ -211,10 +218,10 @@ public class TestReconfiguration { Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); bytes = client.reconfig() - .withMembers("server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); + .adding("server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); qv = getQuorumVerifier(bytes); Assert.assertEquals(qv.getAllMembers().size(), 3); @@ -224,11 +231,11 @@ public class TestReconfiguration { //Add Servers bytes = client.reconfig() - .withMembers("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); + .adding("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); qv = getQuorumVerifier(bytes); Assert.assertEquals(qv.getAllMembers().size(), 4); @@ -236,12 +243,12 @@ public class TestReconfiguration { Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); bytes = client.reconfig() - .withMembers("server.1=" + server1, - "server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); + .adding("server.1=" + server1, + "server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble(); qv = getQuorumVerifier(bytes); Assert.assertEquals(qv.getAllMembers().size(), 5); @@ -250,13 +257,16 @@ public class TestReconfiguration { } @Test - public void testAsyncNonIncremental() throws Exception { - final AtomicReference bytes = new AtomicReference(); - final BackgroundCallback callback = new BackgroundCallback() { + public void testAsyncNonIncremental() throws Exception + { + final AtomicReference bytes = new AtomicReference<>(); + final BackgroundCallback callback = new BackgroundCallback() + { @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { bytes.set(event.getData()); - ((CountDownLatch) event.getContext()).countDown(); + ((CountDownLatch)event.getContext()).countDown(); } }; @@ -274,86 +284,97 @@ public class TestReconfiguration { String server5 = getServerString(qv, cluster, 5L); //Remove Servers - client.reconfig() - .withMembers("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble(); + client.reconfig().inBackground(callback, latch) + .adding("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).forEnsemble(); waitOnDelegateListener.waitForEvent(); Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); qv = getQuorumVerifier(bytes.get()); Assert.assertEquals(qv.getAllMembers().size(), 4); - client.reconfig() - .withMembers("server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble(); + client.reconfig().inBackground(callback, latch) + .adding("server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).forEnsemble(); waitOnDelegateListener.waitForEvent(); Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); qv = getQuorumVerifier(bytes.get()); Assert.assertEquals(qv.getAllMembers().size(), 3); //Add Servers - client.reconfig() - .withMembers("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble(); + client.reconfig().inBackground(callback, latch) + .adding("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).forEnsemble(); waitOnDelegateListener.waitForEvent(); Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); qv = getQuorumVerifier(bytes.get()); Assert.assertEquals(qv.getAllMembers().size(), 4); - client.reconfig() - .withMembers("server.1=" + server1, - "server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble(); + client.reconfig().inBackground(callback, latch) + .adding("server.1=" + server1, + "server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .fromConfig(qv.getVersion()).forEnsemble(); waitOnDelegateListener.waitForEvent(); Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); qv = getQuorumVerifier(bytes.get()); Assert.assertEquals(qv.getAllMembers().size(), 5); } - - static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception { + static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception + { Properties properties = new Properties(); properties.load(new StringReader(new String(bytes))); return new QuorumMaj(properties); } - static InstanceSpec getInstance(TestingCluster cluster, int id) { - for (InstanceSpec spec : cluster.getInstances()) { - if (spec.getServerId() == id) { + static InstanceSpec getInstance(TestingCluster cluster, int id) + { + for ( InstanceSpec spec : cluster.getInstances() ) + { + if ( spec.getServerId() == id ) + { return spec; } } throw new IllegalStateException("InstanceSpec with id:" + id + " not found"); } - static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception { + static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception + { String str = qv.getAllMembers().get(id).toString(); //check if connection string is already there. - if (str.contains(";")) { + if ( str.contains(";") ) + { return str; - } else { - return str + ";" + getInstance(cluster, (int) id).getConnectString(); + } + else + { + return str + ";" + getInstance(cluster, (int)id).getConnectString(); } } - static String getConnectionString(TestingCluster cluster, long... ids) throws Exception { + static String getConnectionString(TestingCluster cluster, long... ids) throws Exception + { StringBuilder sb = new StringBuilder(); - Map specs = new HashMap(); - for (InstanceSpec spec : cluster.getInstances()) { - specs.put(new Long(spec.getServerId()), spec); + Map specs = new HashMap<>(); + for ( InstanceSpec spec : cluster.getInstances() ) + { + specs.put((long)spec.getServerId(), spec); } - for (long id : ids) { - if (sb.length() != 0) { + for ( long id : ids ) + { + if ( sb.length() != 0 ) + { sb.append(","); } sb.append(specs.get(id).getConnectString()); @@ -362,27 +383,34 @@ public class TestReconfiguration { } //Simple EnsembleListener that can wait until the delegate handles the event. - private static class WaitOnDelegateListener implements EnsembleListener { + private static class WaitOnDelegateListener implements EnsembleListener + { private CountDownLatch latch = new CountDownLatch(1); private final EnsembleListener delegate; - private WaitOnDelegateListener(EnsembleListener delegate) { + private WaitOnDelegateListener(EnsembleListener delegate) + { this.delegate = delegate; } @Override - public void connectionStringUpdated(String connectionString) { + public void connectionStringUpdated(String connectionString) + { delegate.connectionStringUpdated(connectionString); latch.countDown(); } - public void waitForEvent() throws InterruptedException, TimeoutException { - if (latch.await(5, TimeUnit.SECONDS)) { + public void waitForEvent() throws InterruptedException, TimeoutException + { + if ( latch.await(5, TimeUnit.SECONDS) ) + { latch = new CountDownLatch(1); - } else { + } + else + { throw new TimeoutException("Failed to receive event in time."); } } - }; + } } \ No newline at end of file