curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dragonsi...@apache.org
Subject [8/9] curator git commit: [CURATOR-160] Add EnsembleListener and EnsembleTracker. Implement a DynamicEnsembleProvider. TestReconfiguration now also tests the DynamicEnsembleProvider.
Date Mon, 17 Aug 2015 16:55:10 GMT
[CURATOR-160] Add EnsembleListener and EnsembleTracker. Implement a DynamicEnsembleProvider.
TestReconfiguration now also tests the DynamicEnsembleProvider.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4ec5ffe3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4ec5ffe3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4ec5ffe3

Branch: refs/heads/CURATOR-160
Commit: 4ec5ffe3a9d1c14a0a5acbe1ebb5552a284a8908
Parents: ec4083f
Author: Ioannis Canellos <iocanel@gmail.com>
Authored: Tue Nov 11 16:35:57 2014 +0200
Committer: Scott Blum <dragonsinth@apache.org>
Committed: Wed Aug 12 17:08:33 2015 -0400

----------------------------------------------------------------------
 .../curator/ensemble/EnsembleListener.java      |  24 +
 .../dynamic/DynamicEnsembleProvider.java        |  61 +++
 .../curator/framework/imps/EnsembleTracker.java | 167 +++++++
 .../framework/imps/TestReconfiguration.java     | 489 +++++++++++--------
 4 files changed, 538 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/4ec5ffe3/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
new file mode 100644
index 0000000..8f963cd
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.ensemble;
+
+public interface EnsembleListener {
+
+    void connectionStringUpdated(String connectionString);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4ec5ffe3/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
new file mode 100644
index 0000000..70b755f
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.ensemble.dynamic;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.ensemble.EnsembleProvider;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DynamicEnsembleProvider implements EnsembleProvider, EnsembleListener {
+
+    private final AtomicReference<String> connectionString = new AtomicReference<String>();
+
+    /**
+     * The connection string to use
+     *
+     * @param connectionString connection string
+     */
+    public DynamicEnsembleProvider(String connectionString)
+    {
+        this.connectionString.set(Preconditions.checkNotNull(connectionString, "connectionString
cannot be null"));
+    }
+
+    @Override
+    public void start() throws Exception {
+        // NOP
+    }
+
+    @Override
+    public String getConnectionString() {
+        return connectionString.get();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // NOP
+    }
+
+    @Override
+    public void connectionStringUpdated(String connectionString) {
+        this.connectionString.set(connectionString);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4ec5ffe3/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
new file mode 100644
index 0000000..a789e42
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener}
instances.
+ */
+public class EnsembleTracker implements Closeable {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<EnsembleListener>();
+    private final AtomicBoolean isConnected = new AtomicBoolean(true);
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState) {
+            if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED))
{
+                if (isConnected.compareAndSet(false, true)) {
+                    try {
+                        reset();
+                    } catch (Exception e) {
+                        log.error("Trying to reset after reconnection", e);
+                    }
+                }
+            } else {
+                isConnected.set(false);
+            }
+        }
+    };
+
+    private final CuratorWatcher watcher = new CuratorWatcher() {
+        @Override
+        public void process(WatchedEvent event) throws Exception {
+            reset();
+        }
+    };
+
+
+    private enum State {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    private final BackgroundCallback backgroundCallback = new BackgroundCallback() {
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
+            processBackgroundResult(event);
+        }
+    };
+
+
+    public EnsembleTracker(CuratorFramework client) {
+        this.client = client;
+    }
+
+    public void start() throws Exception {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot
be started more than once");
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        reset();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (state.compareAndSet(State.STARTED, State.CLOSED)) {
+            listeners.clear();
+        }
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
+    }
+
+    /**
+     * Return the ensemble listenable
+     *
+     * @return listenable
+     */
+    public ListenerContainer<EnsembleListener> getListenable()
+    {
+        Preconditions.checkState(state.get() != State.CLOSED, "Closed");
+
+        return listeners;
+    }
+
+    private void reset() throws Exception {
+        client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+    }
+
+    private void processBackgroundResult(CuratorEvent event) throws Exception {
+        switch (event.getType()) {
+            case GET_CONFIG: {
+                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+                    processConfigData(event.getData());
+                }
+            }
+        }
+    }
+
+    private void processConfigData(byte[] data) throws Exception {
+        Properties properties = new Properties();
+        properties.load(new ByteArrayInputStream(data));
+        QuorumVerifier qv = new QuorumMaj(properties);
+        StringBuilder sb = new StringBuilder();
+        for (QuorumPeer.QuorumServer server : qv.getAllMembers().values()) {
+            if (sb.length() != 0) {
+                sb.append(",");
+            }
+            sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
+        }
+
+        final String connectionString = sb.toString();
+        listeners.forEach
+                (
+                        new Function<EnsembleListener, Void>() {
+                            @Override
+                            public Void apply(EnsembleListener listener) {
+                                try {
+                                    listener.connectionStringUpdated(connectionString);
+                                } catch (Exception e) {
+                                    log.error("Calling listener", e);
+                                }
+                                return null;
+                            }
+                        }
+                );
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4ec5ffe3/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index e8896ae..faec551 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -18,6 +18,8 @@
  */
 package org.apache.curator.framework.imps;
 
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -35,246 +37,287 @@ import org.testng.annotations.Test;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class TestReconfiguration {
 
     TestingCluster cluster;
+    DynamicEnsembleProvider dynamicEnsembleProvider;
+    WaitOnDelegateListener waitOnDelegateListener;
+    EnsembleTracker ensembleTracker;
+    CuratorFramework client;
+
+    String connectionString1to5;
+    String connectionString2to5;
+    String connectionString3to5;
 
     @BeforeMethod
     public void setup() throws Exception {
         cluster = new TestingCluster(5);
         cluster.start();
+
+        connectionString1to5 = cluster.getConnectString();
+        connectionString2to5 = getConnectionString(cluster, 2,3,4,5);
+        connectionString3to5 = getConnectionString(cluster, 3,4,5);
+
+        dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5);
+        client = CuratorFrameworkFactory.builder()
+                .ensembleProvider(dynamicEnsembleProvider)
+                .retryPolicy(new RetryOneTime(1))
+                .build();
+        client.start();
+        client.blockUntilConnected();
+
+        //Wrap around the dynamic ensemble provider, so that we can wait until it has received
the event.
+        waitOnDelegateListener = new WaitOnDelegateListener(dynamicEnsembleProvider);
+        ensembleTracker = new EnsembleTracker(client);
+        ensembleTracker.getListenable().addListener(waitOnDelegateListener);
+        ensembleTracker.start();
+        //Wait for the initial event.
+        waitOnDelegateListener.waitForEvent();
     }
 
     @AfterMethod
     public void tearDown() throws IOException {
+        ensembleTracker.close();
+        client.close();
         cluster.close();
     }
 
     @Test
     public void testSyncIncremental() throws Exception {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(),
new RetryOneTime(1));
-        client.start();
-        client.blockUntilConnected();
-        try {
-            Stat stat = new Stat();
-            byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
-            Assert.assertNotNull(bytes);
-            QuorumVerifier qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 5);
-            String server1 = getServerString(qv, cluster, 1L);
-            String server2 = getServerString(qv, cluster, 2L);
-
-            //Remove Servers
-            bytes = client.reconfig().leaving("1").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-            qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 4);
-            bytes = client.reconfig().leaving("2").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-            qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 3);
-
-            //Add Servers
-            bytes = client.reconfig().joining("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-            qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 4);
-            bytes = client.reconfig().joining("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-            qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 5);
-        } finally {
-            client.close();
-        }
+        Stat stat = new Stat();
+        byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
+        Assert.assertNotNull(bytes);
+        QuorumVerifier qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 5);
+        String server1 = getServerString(qv, cluster, 1L);
+        String server2 = getServerString(qv, cluster, 2L);
+
+        //Remove Servers
+        bytes = client.reconfig().leaving("1").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+        qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+        bytes = client.reconfig().leaving("2").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+        qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+
+        //Add Servers
+        bytes = client.reconfig().joining("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+        qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+        bytes = client.reconfig().joining("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+        qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 5);
+
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
     }
 
     @Test
     public void testAsyncIncremental() throws Exception {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(),
new RetryOneTime(1));
-        client.start();
-        client.blockUntilConnected();
-        try {
-            final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
-            final BackgroundCallback callback = new BackgroundCallback() {
-                @Override
-                public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception {
-                    bytes.set(event.getData());
-                    ((CountDownLatch)event.getContext()).countDown();
+        final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
+        final BackgroundCallback callback = new BackgroundCallback() {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception {
+                bytes.set(event.getData());
+                //We only need the latch on getConfig.
+                if (event.getContext() != null) {
+                    ((CountDownLatch) event.getContext()).countDown();
                 }
+            }
 
-            };
-
-            CountDownLatch latch = new CountDownLatch(1);
-            client.getConfig().inBackground(callback, latch).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            Assert.assertNotNull(bytes.get());
-            QuorumVerifier qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 5);
-            String server1 = getServerString(qv, cluster, 1L);
-            String server2 = getServerString(qv, cluster, 2L);
-
-
-            //Remove Servers
-            latch = new CountDownLatch(1);
-            client.reconfig().leaving("1").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 4);
-            latch = new CountDownLatch(1);
-            client.reconfig().leaving("2").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 3);
-
-            //Add Servers
-            latch = new CountDownLatch(1);
-            client.reconfig().joining("server.1=" + server1).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 4);
-            latch = new CountDownLatch(1);
-            client.reconfig().joining("server.2=" + server2).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 5);
-        } finally {
-            client.close();
-        }
+        };
+
+        CountDownLatch latch = new CountDownLatch(1);
+        client.getConfig().inBackground(callback, latch).forEnsemble();
+        latch.await(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(bytes.get());
+        QuorumVerifier qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 5);
+        String server1 = getServerString(qv, cluster, 1L);
+        String server2 = getServerString(qv, cluster, 2L);
+
+
+        //Remove Servers
+        client.reconfig().leaving("1").inBackground(callback).fromConfig(qv.getVersion()).forEnsemble();
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+        qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+        client.reconfig().leaving("2").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+        qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+        //Add Servers
+        client.reconfig().joining("server.2=" + server2).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+        qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+
+        client.reconfig().joining("server.1=" + server1).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
+        qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 5);
     }
 
     @Test
     public void testSyncNonIncremental() throws Exception {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(),
new RetryOneTime(1));
-        client.start();
-        client.blockUntilConnected();
-        try {
-            Stat stat = new Stat();
-            byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
-            Assert.assertNotNull(bytes);
-            QuorumVerifier qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 5);
-            String server1 = getServerString(qv, cluster, 1L);
-            String server2 = getServerString(qv, cluster, 2L);
-            String server3 = getServerString(qv, cluster, 3L);
-            String server4 = getServerString(qv, cluster, 4L);
-            String server5 = getServerString(qv, cluster, 5L);
-
-            //Remove Servers
-            bytes = client.reconfig()
-                    .withMembers("server.2=" + server2,
-                            "server.3=" + server3,
-                            "server.4=" + server4,
-                            "server.5=" + server5)
-                    .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-            qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 4);
-            bytes = client.reconfig()
-                    .withMembers("server.3=" + server3,
-                            "server.4=" + server4,
-                            "server.5=" + server5)
-                    .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-
-            qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 3);
-
-            //Add Servers
-            bytes = client.reconfig()
-                    .withMembers("server.1=" + server1,
-                            "server.3=" + server3,
-                            "server.4=" + server4,
-                            "server.5=" + server5)
-                    .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-            qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 4);
-            bytes = client.reconfig()
-                    .withMembers("server.1=" + server1,
-                            "server.2=" + server2,
-                            "server.3=" + server3,
-                            "server.4=" + server4,
-                            "server.5=" + server5)
-                    .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
-            qv = getQuorumVerifier(bytes);
-            Assert.assertEquals(qv.getAllMembers().size(), 5);
-        } finally {
-            client.close();
-        }
+        Stat stat = new Stat();
+        byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
+        Assert.assertNotNull(bytes);
+        QuorumVerifier qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 5);
+        String server1 = getServerString(qv, cluster, 1L);
+        String server2 = getServerString(qv, cluster, 2L);
+        String server3 = getServerString(qv, cluster, 3L);
+        String server4 = getServerString(qv, cluster, 4L);
+        String server5 = getServerString(qv, cluster, 5L);
+
+        //Remove Servers
+        bytes = client.reconfig()
+                .withMembers("server.2=" + server2,
+                        "server.3=" + server3,
+                        "server.4=" + server4,
+                        "server.5=" + server5)
+                .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+        qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+        bytes = client.reconfig()
+                .withMembers("server.3=" + server3,
+                        "server.4=" + server4,
+                        "server.5=" + server5)
+                .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+
+        qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+
+        //Add Servers
+        bytes = client.reconfig()
+                .withMembers("server.2=" + server2,
+                        "server.3=" + server3,
+                        "server.4=" + server4,
+                        "server.5=" + server5)
+                .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+        qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+        bytes = client.reconfig()
+                .withMembers("server.1=" + server1,
+                        "server.2=" + server2,
+                        "server.3=" + server3,
+                        "server.4=" + server4,
+                        "server.5=" + server5)
+                .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble();
+        qv = getQuorumVerifier(bytes);
+        Assert.assertEquals(qv.getAllMembers().size(), 5);
+
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
     }
 
     @Test
     public void testAsyncNonIncremental() throws Exception {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(),
new RetryOneTime(1));
-        client.start();
-        client.blockUntilConnected();
-        try {
-            final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
-            final BackgroundCallback callback = new BackgroundCallback() {
-                @Override
-                public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception {
-                    bytes.set(event.getData());
-                    ((CountDownLatch)event.getContext()).countDown();
-                }
+        final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
+        final BackgroundCallback callback = new BackgroundCallback() {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception {
+                bytes.set(event.getData());
+                ((CountDownLatch) event.getContext()).countDown();
+            }
 
-            };
-
-            CountDownLatch latch = new CountDownLatch(1);
-            client.getConfig().inBackground(callback, latch).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            Assert.assertNotNull(bytes.get());
-            QuorumVerifier qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 5);
-            String server1 = getServerString(qv, cluster, 1L);
-            String server2 = getServerString(qv, cluster, 2L);
-            String server3 = getServerString(qv, cluster, 3L);
-            String server4 = getServerString(qv, cluster, 4L);
-            String server5 = getServerString(qv, cluster, 5L);
-
-            //Remove Servers
-            latch = new CountDownLatch(1);
-            client.reconfig()
-                    .withMembers("server.2=" + server2,
-                            "server.3=" + server3,
-                            "server.4=" + server4,
-                            "server.5=" + server5)
-            .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 4);
-            latch = new CountDownLatch(1);
-            client.reconfig()
-                    .withMembers("server.3=" + server3,
-                            "server.4=" + server4,
-                            "server.5=" + server5)
-                    .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 3);
-
-            //Add Servers
-            latch = new CountDownLatch(1);
-            client.reconfig()
-                    .withMembers("server.1=" + server1,
-                            "server.3=" + server3,
-                            "server.4=" + server4,
-                            "server.5=" + server5)
-                    .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 4);
-            latch = new CountDownLatch(1);
-            client.reconfig()
-                    .withMembers("server.1=" + server1,
-                            "server.2=" + server2,
-                            "server.3=" + server3,
-                            "server.4=" + server4,
-                            "server.5=" + server5)
-                    .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
-            latch.await(5, TimeUnit.SECONDS);
-            qv = getQuorumVerifier(bytes.get());
-            Assert.assertEquals(qv.getAllMembers().size(), 5);
-        } finally {
-            client.close();
-        }
+        };
+
+        CountDownLatch latch = new CountDownLatch(1);
+        client.getConfig().inBackground(callback, latch).forEnsemble();
+        latch.await(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(bytes.get());
+        QuorumVerifier qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 5);
+        String server1 = getServerString(qv, cluster, 1L);
+        String server2 = getServerString(qv, cluster, 2L);
+        String server3 = getServerString(qv, cluster, 3L);
+        String server4 = getServerString(qv, cluster, 4L);
+        String server5 = getServerString(qv, cluster, 5L);
+
+        //Remove Servers
+        client.reconfig()
+                .withMembers("server.2=" + server2,
+                        "server.3=" + server3,
+                        "server.4=" + server4,
+                        "server.5=" + server5)
+                .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+        qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+        client.reconfig()
+                .withMembers("server.3=" + server3,
+                        "server.4=" + server4,
+                        "server.5=" + server5)
+                .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+        qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+        //Add Servers
+        client.reconfig()
+                .withMembers("server.2=" + server2,
+                        "server.3=" + server3,
+                        "server.4=" + server4,
+                        "server.5=" + server5)
+                .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+        qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+        client.reconfig()
+                .withMembers("server.1=" + server1,
+                        "server.2=" + server2,
+                        "server.3=" + server3,
+                        "server.4=" + server4,
+                        "server.5=" + server5)
+                .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble();
+        waitOnDelegateListener.waitForEvent();
+        Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
+        qv = getQuorumVerifier(bytes.get());
+        Assert.assertEquals(qv.getAllMembers().size(), 5);
     }
 
 
@@ -302,4 +345,44 @@ public class TestReconfiguration {
             return str + ";" + getInstance(cluster, (int) id).getConnectString();
         }
     }
+
+    static String getConnectionString(TestingCluster cluster, long... ids) throws Exception
{
+        StringBuilder sb = new StringBuilder();
+        Map<Long, InstanceSpec> specs = new HashMap<Long, InstanceSpec>();
+        for (InstanceSpec spec : cluster.getInstances()) {
+            specs.put(new Long(spec.getServerId()), spec);
+        }
+        for (long id : ids) {
+            if (sb.length() != 0) {
+                sb.append(",");
+            }
+            sb.append(specs.get(id).getConnectString());
+        }
+        return sb.toString();
+    }
+
+    //Simple EnsembleListener that can wait until the delegate handles the event.
+    private static class WaitOnDelegateListener implements EnsembleListener {
+        private CountDownLatch latch = new CountDownLatch(1);
+
+        private final EnsembleListener delegate;
+
+        private WaitOnDelegateListener(EnsembleListener delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void connectionStringUpdated(String connectionString) {
+            delegate.connectionStringUpdated(connectionString);
+            latch.countDown();
+        }
+
+        public void waitForEvent() throws InterruptedException, TimeoutException {
+            if (latch.await(5, TimeUnit.SECONDS)) {
+                latch = new CountDownLatch(1);
+            } else {
+                throw new TimeoutException("Failed to receive event in time.");
+            }
+        }
+    };
 }
\ No newline at end of file


Mime
View raw message