curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dragonsi...@apache.org
Subject [11/27] curator git commit: Reworked the reconfig() APIs to be more like the rest of Curator
Date Mon, 17 Aug 2015 16:54:41 GMT
http://git-wip-us.apache.org/repos/asf/curator/blob/57dbf2e1/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 44f9d00..297cf9b 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import org.apache.curator.ensemble.EnsembleListener;
@@ -27,6 +28,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -34,7 +36,6 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.HashMap;
@@ -45,32 +46,33 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class TestReconfiguration {
-
-    TestingCluster cluster;
-    DynamicEnsembleProvider dynamicEnsembleProvider;
-    WaitOnDelegateListener waitOnDelegateListener;
-    EnsembleTracker ensembleTracker;
-    CuratorFramework client;
+public class TestReconfiguration
+{
+    private TestingCluster cluster;
+    private DynamicEnsembleProvider dynamicEnsembleProvider;
+    private WaitOnDelegateListener waitOnDelegateListener;
+    private EnsembleTracker ensembleTracker;
+    private CuratorFramework client;
 
-    String connectionString1to5;
-    String connectionString2to5;
-    String connectionString3to5;
+    private String connectionString1to5;
+    private String connectionString2to5;
+    private String connectionString3to5;
 
     @BeforeMethod
-    public void setup() throws Exception {
+    public void setup() throws Exception
+    {
         cluster = new TestingCluster(5);
         cluster.start();
 
         connectionString1to5 = cluster.getConnectString();
-        connectionString2to5 = getConnectionString(cluster, 2,3,4,5);
-        connectionString3to5 = getConnectionString(cluster, 3,4,5);
+        connectionString2to5 = getConnectionString(cluster, 2, 3, 4, 5);
+        connectionString3to5 = getConnectionString(cluster, 3, 4, 5);
 
         dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5);
         client = CuratorFrameworkFactory.builder()
-                .ensembleProvider(dynamicEnsembleProvider)
-                .retryPolicy(new RetryOneTime(1))
-                .build();
+            .ensembleProvider(dynamicEnsembleProvider)
+            .retryPolicy(new RetryOneTime(1))
+            .build();
         client.start();
         client.blockUntilConnected();
 
@@ -84,14 +86,16 @@ public class TestReconfiguration {
     }
 
     @AfterMethod
-    public void tearDown() throws IOException {
-        ensembleTracker.close();
-        client.close();
-        cluster.close();
+    public void tearDown() throws IOException
+    {
+        CloseableUtils.closeQuietly(ensembleTracker);
+        CloseableUtils.closeQuietly(client);
+        CloseableUtils.closeQuietly(cluster);
     }
 
     @Test
-    public void testSyncIncremental() throws Exception {
+    public void testSyncIncremental() throws Exception
+    {
         Stat stat = new Stat();
         byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
         Assert.assertNotNull(bytes);
@@ -132,15 +136,19 @@ public class TestReconfiguration {
     }
 
     @Test
-    public void testAsyncIncremental() throws Exception {
-        final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
-        final BackgroundCallback callback = new BackgroundCallback() {
+    public void testAsyncIncremental() throws Exception
+    {
+        final AtomicReference<byte[]> bytes = new AtomicReference<>();
+        final BackgroundCallback callback = new BackgroundCallback()
+        {
             @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception {
+            public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
+            {
                 bytes.set(event.getData());
                 //We only need the latch on getConfig.
-                if (event.getContext() != null) {
-                    ((CountDownLatch) event.getContext()).countDown();
+                if ( event.getContext() != null )
+                {
+                    ((CountDownLatch)event.getContext()).countDown();
                 }
             }
 
@@ -155,29 +163,27 @@ public class TestReconfiguration {
         String server1 = getServerString(qv, cluster, 1L);
         String server2 = getServerString(qv, cluster, 2L);
 
-
         //Remove Servers
-        client.reconfig().leaving("1").fromConfig(qv.getVersion()).inBackground(callback).forEnsemble();
+        client.reconfig().inBackground(callback).leaving("1").fromConfig(qv.getVersion()).forEnsemble();
         waitOnDelegateListener.waitForEvent();
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
         qv = getQuorumVerifier(bytes.get());
         Assert.assertEquals(qv.getAllMembers().size(), 4);
 
-        client.reconfig().leaving("2").fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+        client.reconfig().inBackground(callback, latch).leaving("2").fromConfig(qv.getVersion()).forEnsemble();
         waitOnDelegateListener.waitForEvent();
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
         qv = getQuorumVerifier(bytes.get());
         Assert.assertEquals(qv.getAllMembers().size(), 3);
 
         //Add Servers
-        client.reconfig().joining("server.2=" + server2).fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+        client.reconfig().inBackground(callback, latch).joining("server.2=" + server2).fromConfig(qv.getVersion()).forEnsemble();
         waitOnDelegateListener.waitForEvent();
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
         qv = getQuorumVerifier(bytes.get());
         Assert.assertEquals(qv.getAllMembers().size(), 4);
 
-
-        client.reconfig().joining("server.1=" + server1).fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+        client.reconfig().inBackground(callback, latch).joining("server.1=" + server1).fromConfig(qv.getVersion()).forEnsemble();
         waitOnDelegateListener.waitForEvent();
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
         qv = getQuorumVerifier(bytes.get());
@@ -185,7 +191,8 @@ public class TestReconfiguration {
     }
 
     @Test
-    public void testSyncNonIncremental() throws Exception {
+    public void testSyncNonIncremental() throws Exception
+    {
         Stat stat = new Stat();
         byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
         Assert.assertNotNull(bytes);
@@ -199,11 +206,11 @@ public class TestReconfiguration {
 
         //Remove Servers
         bytes = client.reconfig()
-                .withMembers("server.2=" + server2,
-                        "server.3=" + server3,
-                        "server.4=" + server4,
-                        "server.5=" + server5)
-                .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+            .adding("server.2=" + server2,
+                "server.3=" + server3,
+                "server.4=" + server4,
+                "server.5=" + server5)
+            .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
         qv = getQuorumVerifier(bytes);
         Assert.assertEquals(qv.getAllMembers().size(), 4);
 
@@ -211,10 +218,10 @@ public class TestReconfiguration {
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
 
         bytes = client.reconfig()
-                .withMembers("server.3=" + server3,
-                        "server.4=" + server4,
-                        "server.5=" + server5)
-                .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+            .adding("server.3=" + server3,
+                "server.4=" + server4,
+                "server.5=" + server5)
+            .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
 
         qv = getQuorumVerifier(bytes);
         Assert.assertEquals(qv.getAllMembers().size(), 3);
@@ -224,11 +231,11 @@ public class TestReconfiguration {
 
         //Add Servers
         bytes = client.reconfig()
-                .withMembers("server.2=" + server2,
-                        "server.3=" + server3,
-                        "server.4=" + server4,
-                        "server.5=" + server5)
-                .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+            .adding("server.2=" + server2,
+                "server.3=" + server3,
+                "server.4=" + server4,
+                "server.5=" + server5)
+            .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
         qv = getQuorumVerifier(bytes);
         Assert.assertEquals(qv.getAllMembers().size(), 4);
 
@@ -236,12 +243,12 @@ public class TestReconfiguration {
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
 
         bytes = client.reconfig()
-                .withMembers("server.1=" + server1,
-                        "server.2=" + server2,
-                        "server.3=" + server3,
-                        "server.4=" + server4,
-                        "server.5=" + server5)
-                .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+            .adding("server.1=" + server1,
+                "server.2=" + server2,
+                "server.3=" + server3,
+                "server.4=" + server4,
+                "server.5=" + server5)
+            .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
         qv = getQuorumVerifier(bytes);
         Assert.assertEquals(qv.getAllMembers().size(), 5);
 
@@ -250,13 +257,16 @@ public class TestReconfiguration {
     }
 
     @Test
-    public void testAsyncNonIncremental() throws Exception {
-        final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
-        final BackgroundCallback callback = new BackgroundCallback() {
+    public void testAsyncNonIncremental() throws Exception
+    {
+        final AtomicReference<byte[]> bytes = new AtomicReference<>();
+        final BackgroundCallback callback = new BackgroundCallback()
+        {
             @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception {
+            public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
+            {
                 bytes.set(event.getData());
-                ((CountDownLatch) event.getContext()).countDown();
+                ((CountDownLatch)event.getContext()).countDown();
             }
 
         };
@@ -274,86 +284,97 @@ public class TestReconfiguration {
         String server5 = getServerString(qv, cluster, 5L);
 
         //Remove Servers
-        client.reconfig()
-                .withMembers("server.2=" + server2,
-                        "server.3=" + server3,
-                        "server.4=" + server4,
-                        "server.5=" + server5)
-                .fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble();
+        client.reconfig().inBackground(callback, latch)
+            .adding("server.2=" + server2,
+                "server.3=" + server3,
+                "server.4=" + server4,
+                "server.5=" + server5)
+            .fromConfig(qv.getVersion()).forEnsemble();
         waitOnDelegateListener.waitForEvent();
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
         qv = getQuorumVerifier(bytes.get());
         Assert.assertEquals(qv.getAllMembers().size(), 4);
 
-        client.reconfig()
-                .withMembers("server.3=" + server3,
-                        "server.4=" + server4,
-                        "server.5=" + server5)
-                .fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble();
+        client.reconfig().inBackground(callback, latch)
+            .adding("server.3=" + server3,
+                "server.4=" + server4,
+                "server.5=" + server5)
+            .fromConfig(qv.getVersion()).forEnsemble();
         waitOnDelegateListener.waitForEvent();
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
         qv = getQuorumVerifier(bytes.get());
         Assert.assertEquals(qv.getAllMembers().size(), 3);
 
         //Add Servers
-        client.reconfig()
-                .withMembers("server.2=" + server2,
-                        "server.3=" + server3,
-                        "server.4=" + server4,
-                        "server.5=" + server5)
-                .fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble();
+        client.reconfig().inBackground(callback, latch)
+            .adding("server.2=" + server2,
+                "server.3=" + server3,
+                "server.4=" + server4,
+                "server.5=" + server5)
+            .fromConfig(qv.getVersion()).forEnsemble();
         waitOnDelegateListener.waitForEvent();
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
         qv = getQuorumVerifier(bytes.get());
         Assert.assertEquals(qv.getAllMembers().size(), 4);
 
-        client.reconfig()
-                .withMembers("server.1=" + server1,
-                        "server.2=" + server2,
-                        "server.3=" + server3,
-                        "server.4=" + server4,
-                        "server.5=" + server5)
-                .fromConfig(qv.getVersion()).inBackground(callback, latch).forEnsemble();
+        client.reconfig().inBackground(callback, latch)
+            .adding("server.1=" + server1,
+                "server.2=" + server2,
+                "server.3=" + server3,
+                "server.4=" + server4,
+                "server.5=" + server5)
+            .fromConfig(qv.getVersion()).forEnsemble();
         waitOnDelegateListener.waitForEvent();
         Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
         qv = getQuorumVerifier(bytes.get());
         Assert.assertEquals(qv.getAllMembers().size(), 5);
     }
 
-
-    static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception {
+    static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception
+    {
         Properties properties = new Properties();
         properties.load(new StringReader(new String(bytes)));
         return new QuorumMaj(properties);
     }
 
-    static InstanceSpec getInstance(TestingCluster cluster, int id) {
-        for (InstanceSpec spec : cluster.getInstances()) {
-            if (spec.getServerId() == id) {
+    static InstanceSpec getInstance(TestingCluster cluster, int id)
+    {
+        for ( InstanceSpec spec : cluster.getInstances() )
+        {
+            if ( spec.getServerId() == id )
+            {
                 return spec;
             }
         }
         throw new IllegalStateException("InstanceSpec with id:" + id + " not found");
     }
 
-    static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws
Exception {
+    static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws
Exception
+    {
         String str = qv.getAllMembers().get(id).toString();
         //check if connection string is already there.
-        if (str.contains(";")) {
+        if ( str.contains(";") )
+        {
             return str;
-        } else {
-            return str + ";" + getInstance(cluster, (int) id).getConnectString();
+        }
+        else
+        {
+            return str + ";" + getInstance(cluster, (int)id).getConnectString();
         }
     }
 
-    static String getConnectionString(TestingCluster cluster, long... ids) throws Exception
{
+    static String getConnectionString(TestingCluster cluster, long... ids) throws Exception
+    {
         StringBuilder sb = new StringBuilder();
-        Map<Long, InstanceSpec> specs = new HashMap<Long, InstanceSpec>();
-        for (InstanceSpec spec : cluster.getInstances()) {
-            specs.put(new Long(spec.getServerId()), spec);
+        Map<Long, InstanceSpec> specs = new HashMap<>();
+        for ( InstanceSpec spec : cluster.getInstances() )
+        {
+            specs.put((long)spec.getServerId(), spec);
         }
-        for (long id : ids) {
-            if (sb.length() != 0) {
+        for ( long id : ids )
+        {
+            if ( sb.length() != 0 )
+            {
                 sb.append(",");
             }
             sb.append(specs.get(id).getConnectString());
@@ -362,27 +383,34 @@ public class TestReconfiguration {
     }
 
     //Simple EnsembleListener that can wait until the delegate handles the event.
-    private static class WaitOnDelegateListener implements EnsembleListener {
+    private static class WaitOnDelegateListener implements EnsembleListener
+    {
         private CountDownLatch latch = new CountDownLatch(1);
 
         private final EnsembleListener delegate;
 
-        private WaitOnDelegateListener(EnsembleListener delegate) {
+        private WaitOnDelegateListener(EnsembleListener delegate)
+        {
             this.delegate = delegate;
         }
 
         @Override
-        public void connectionStringUpdated(String connectionString) {
+        public void connectionStringUpdated(String connectionString)
+        {
             delegate.connectionStringUpdated(connectionString);
             latch.countDown();
         }
 
-        public void waitForEvent() throws InterruptedException, TimeoutException {
-            if (latch.await(5, TimeUnit.SECONDS)) {
+        public void waitForEvent() throws InterruptedException, TimeoutException
+        {
+            if ( latch.await(5, TimeUnit.SECONDS) )
+            {
                 latch = new CountDownLatch(1);
-            } else {
+            }
+            else
+            {
                 throw new TimeoutException("Failed to receive event in time.");
             }
         }
-    };
+    }
 }
\ No newline at end of file


Mime
View raw message