zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mic...@apache.org
Subject svn commit: r1584497 [9/9] - in /zookeeper/trunk: bin/ src/c/ src/contrib/loggraph/web/org/apache/zookeeper/graph/resources/ src/contrib/zooinspector/ src/contrib/zooinspector/lib/ src/contrib/zooinspector/src/java/org/apache/zookeeper/inspector/ src/c...
Date Fri, 04 Apr 2014 01:24:38 GMT
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReconfigTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReconfigTest.java?rev=1584497&r1=1584496&r2=1584497&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReconfigTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReconfigTest.java Fri Apr  4 01:24:37 2014
@@ -1,736 +1,736 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.junit.Assert;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReconfigTest extends ZKTestCase implements DataCallback{
-    private static final Logger LOG = LoggerFactory
-            .getLogger(ReconfigTest.class);
-
-    private QuorumUtil qu;
-
-    @After
-    public void tearDown() throws Exception {
-        if (qu != null) {
-            qu.tearDown();
-        }
-    }
-
-    public static String reconfig(ZooKeeper zk, List<String> joiningServers,
-            List<String> leavingServers, List<String> newMembers, long fromConfig)
-            throws KeeperException, InterruptedException {
-        byte[] config = null;
-        for (int j = 0; j < 30; j++) {
-            try {
-                config = zk.reconfig(joiningServers, leavingServers,
-                        newMembers, fromConfig, new Stat());
-                break;
-            } catch (KeeperException.ConnectionLossException e) {
-                if (j < 29) {
-                    Thread.sleep(1000);
-                } else {
-                    // test fails if we still can't connect to the quorum after
-                    // 30 seconds.
-                    Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
-                }
-            }
-        }
-
-        String configStr = new String(config);
-        if (joiningServers != null) {
-            for (String joiner : joiningServers)
-                Assert.assertTrue(configStr.contains(joiner));
-        }
-        if (leavingServers != null) {
-            for (String leaving : leavingServers)
-                Assert.assertFalse(configStr.contains("server.".concat(leaving)));
-        }
-
-        return configStr;
-    }
-
-    public static String testServerHasConfig(ZooKeeper zk,
-            List<String> joiningServers, List<String> leavingServers)
-            throws KeeperException, InterruptedException {
-        byte[] config = null;
-        for (int j = 0; j < 30; j++) {
-            try {
-                zk.sync("/", null, null);
-                config = zk.getConfig(false, new Stat());
-                break;
-            } catch (KeeperException.ConnectionLossException e) {
-                if (j < 29) {
-                    Thread.sleep(1000);
-                } else {
-                    // test fails if we still can't connect to the quorum after
-                    // 30 seconds.
-                    Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
-                }
-            }
-
-        }
-        String configStr = new String(config);
-        if (joiningServers != null) {
-            for (String joiner : joiningServers) {
-               Assert.assertTrue(configStr.contains(joiner));
-            }
-        }
-        if (leavingServers != null) {
-            for (String leaving : leavingServers)
-                Assert.assertFalse(configStr.contains("server.".concat(leaving)));
-        }
-
-        return configStr;
-    }
-    
-    public static void testNormalOperation(ZooKeeper writer, ZooKeeper reader)
-            throws KeeperException, InterruptedException {
-        boolean testNodeExists = false;
-       
-       for (int j = 0; j < 30; j++) {
-            try {
-               if (!testNodeExists) {
-                   try{ 
-                       writer.create("/test", "test".getBytes(),
-                           ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                   } catch (KeeperException.NodeExistsException e) {                       
-                   }
-                   testNodeExists = true;
-               }
-                String data = "test" + j;
-                writer.setData("/test", data.getBytes(), -1);
-                reader.sync("/", null, null);
-                byte[] res = reader.getData("/test", null, new Stat());
-                Assert.assertEquals(data, new String(res));
-                break;
-            } catch (KeeperException.ConnectionLossException e) {
-                if (j < 29) {
-                    Thread.sleep(1000);
-                } else {
-                    // test fails if we still can't connect to the quorum after
-                    // 30 seconds.
-                    Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
-                }
-            }
-
-        }
-
-    }    
-    
-    private int getLeaderId(QuorumUtil qu) {
-        int leaderId = 1;
-        while (qu.getPeer(leaderId).peer.leader == null)
-            leaderId++;
-        return leaderId;
-    }
-
-    private ZooKeeper[] createHandles(QuorumUtil qu) throws IOException {
-        // create an extra handle, so we can index the handles from 1 to qu.ALL
-        // using the server id.
-        ZooKeeper[] zkArr = new ZooKeeper[qu.ALL + 1];
-        zkArr[0] = null; // not used.
-        for (int i = 1; i <= qu.ALL; i++) {
-            // server ids are 1, 2 and 3
-            zkArr[i] = new ZooKeeper("127.0.0.1:"
-                    + qu.getPeer(i).peer.getClientPort(),
-                    ClientBase.CONNECTION_TIMEOUT, new Watcher() {
-                        public void process(WatchedEvent event) {
-                        }});
-        }
-        return zkArr;
-    }
-
-    private void closeAllHandles(ZooKeeper[] zkArr) throws InterruptedException {
-        for (ZooKeeper zk : zkArr)
-            if (zk != null)
-                zk.close();
-    }
-
- 
-    @Test
-    public void testRemoveAddOne() throws Exception {
-        qu = new QuorumUtil(1); // create 3 servers
-        qu.disableJMXTest = true;
-        qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-
-        List<String> leavingServers = new ArrayList<String>();
-        List<String> joiningServers = new ArrayList<String>();
-
-        int leaderIndex = getLeaderId(qu);
-
-        // during first iteration, leavingIndex will correspond to a follower
-        // during second iteration leavingIndex will be the index of the leader
-        int leavingIndex = (leaderIndex == 1) ? 2 : 1;
-
-        for (int i = 0; i < 2; i++) {
-            // some of the operations will be executed by a client connected to
-            // the removed server
-            // while others are invoked by a client connected to some other
-            // server.
-            // when we're removing the leader, zk1 will be the client connected
-            // to removed server
-            ZooKeeper zk1 = (leavingIndex == leaderIndex) ? zkArr[leaderIndex]
-                    : zkArr[(leaderIndex % qu.ALL) + 1];
-            ZooKeeper zk2 = (leavingIndex == leaderIndex) ? zkArr[(leaderIndex % qu.ALL) + 1]
-                    : zkArr[leaderIndex];
-
-            leavingServers.add(Integer.toString(leavingIndex));
-
-            // remember this server so we can add it back later
-            joiningServers.add("server."
-                    + leavingIndex
-                    + "=localhost:"
-                    + qu.getPeer(leavingIndex).peer.getQuorumAddress()
-                            .getPort()
-                    + ":"
-                    + qu.getPeer(leavingIndex).peer.getElectionAddress()
-                            .getPort() + ":participant;localhost:"
-                    + qu.getPeer(leavingIndex).peer.getClientPort());
-
-            String configStr = reconfig(zk1, null, leavingServers, null, -1);
-            testServerHasConfig(zk2, null, leavingServers);
-            testNormalOperation(zk2, zk1);
-
-            QuorumVerifier qv = qu.getPeer(1).peer.configFromString(configStr);
-            long version = qv.getVersion();
-
-            // checks that conditioning on version works properly
-            try {
-                reconfig(zk2, joiningServers, null, null, version + 1);
-                Assert.fail("reconfig succeeded even though version condition was incorrect!");
-            } catch (KeeperException.BadVersionException e) {
-
-            }
-
-            reconfig(zk2, joiningServers, null, null, version);
-
-            testNormalOperation(zk1, zk2);
-            testServerHasConfig(zk1, joiningServers, null);
-
-            // second iteration of the loop will remove the leader
-            // and add it back (as follower)
-            leavingIndex = leaderIndex = getLeaderId(qu);
-            leavingServers.clear();
-            joiningServers.clear();
-        }
-
-        closeAllHandles(zkArr);
-    }
-
-    /**
-     * 1. removes and adds back two servers (incl leader). One of the servers is added back as observer
-     * 2. tests that reconfig fails if quorum of new config is not up
-     * 3. tests that a server that's not up during reconfig learns the new config when it comes up
-     * @throws Exception
-     */
-    @Test
-    public void testRemoveAddTwo() throws Exception {
-        qu = new QuorumUtil(2); // create 5 servers
-        qu.disableJMXTest = true;
-        qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-
-        List<String> leavingServers = new ArrayList<String>();
-        List<String> joiningServers = new ArrayList<String>();
-
-        int leaderIndex = getLeaderId(qu);
-
-        // lets remove the leader and some other server
-        int leavingIndex1 = leaderIndex;
-        int leavingIndex2 = (leaderIndex == 1) ? 2 : 1;
-
-        // find some server that's staying
-        int stayingIndex1 = 1, stayingIndex2 = 1, stayingIndex3 = 1;
-        while (stayingIndex1 == leavingIndex1 || stayingIndex1 == leavingIndex2)
-            stayingIndex1++;
-
-        while (stayingIndex2 == leavingIndex1 || stayingIndex2 == leavingIndex2
-                || stayingIndex2 == stayingIndex1)
-            stayingIndex2++;
-
-        while (stayingIndex3 == leavingIndex1 || stayingIndex3 == leavingIndex2
-                || stayingIndex3 == stayingIndex1
-                || stayingIndex3 == stayingIndex2)
-            stayingIndex3++;
-
-        leavingServers.add(Integer.toString(leavingIndex1));
-        leavingServers.add(Integer.toString(leavingIndex2));
-
-        // remember these servers so we can add them back later
-        joiningServers.add("server." + leavingIndex1 + "=localhost:"
-                + qu.getPeer(leavingIndex1).peer.getQuorumAddress().getPort()
-                + ":"
-                + qu.getPeer(leavingIndex1).peer.getElectionAddress().getPort()
-                + ":participant;localhost:"
-                + qu.getPeer(leavingIndex1).peer.getClientPort());
-
-        // this server will be added back as an observer
-        joiningServers.add("server." + leavingIndex2 + "=localhost:"
-                + qu.getPeer(leavingIndex2).peer.getQuorumAddress().getPort()
-                + ":"
-                + qu.getPeer(leavingIndex2).peer.getElectionAddress().getPort()
-                + ":observer;localhost:"
-                + qu.getPeer(leavingIndex2).peer.getClientPort());
-
-        qu.shutdown(leavingIndex1);
-        qu.shutdown(leavingIndex2);
-
-        // 3 servers still up so this should work
-        reconfig(zkArr[stayingIndex2], null, leavingServers, null, -1);
-        
-        qu.shutdown(stayingIndex2);
-
-        // the following commands would not work in the original
-        // cluster of 5, but now that we've removed 2 servers
-        // we have a cluster of 3 servers and one of them is allowed to fail
-
-        testServerHasConfig(zkArr[stayingIndex1], null, leavingServers);
-        testServerHasConfig(zkArr[stayingIndex3], null, leavingServers);
-        testNormalOperation(zkArr[stayingIndex1], zkArr[stayingIndex3]);
-        
-        // this is a test that a reconfig will only succeed
-        // if there is a quorum up in new config. Below there is no
-        // quorum so it should fail
-        
-        // the sleep is necessary so that the leader figures out
-        // that the switched off servers are down
-        Thread.sleep(10000);
-
-        try {
-            reconfig(zkArr[stayingIndex1], joiningServers, null, null, -1);
-            Assert.fail("reconfig completed successfully even though there is no quorum up in new config!");
-        } catch (KeeperException.NewConfigNoQuorum e) {
-
-        }
-        
-        // now start the third server so that new config has quorum
-        qu.restart(stayingIndex2);
-
-        reconfig(zkArr[stayingIndex1], joiningServers, null, null, -1);
-        testNormalOperation(zkArr[stayingIndex2], zkArr[stayingIndex3]);
-        testServerHasConfig(zkArr[stayingIndex2], joiningServers, null);
-
-        // this server wasn't around during the configuration change
-        // we should check that it is able to connect, finds out
-        // about the change and becomes an observer.
-
-        qu.restart(leavingIndex2);
-        Assert.assertTrue(qu.getPeer(leavingIndex2).peer.getPeerState() == ServerState.OBSERVING);
-        testNormalOperation(zkArr[stayingIndex2], zkArr[leavingIndex2]);
-        testServerHasConfig(zkArr[leavingIndex2], joiningServers, null);
-
-        closeAllHandles(zkArr);
-    }
-
-    @Test
-    public void testBulkReconfig() throws Exception {
-        qu = new QuorumUtil(3); // create 7 servers
-        qu.disableJMXTest = true;
-        qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-
-        // new config will have three of the servers as followers
-        // two of the servers as observers, and all ports different
-        ArrayList<String> newServers = new ArrayList<String>();
-        for (int i = 1; i <= 5; i++) {
-            String server = "server." + i + "=localhost:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique() + ":"
-                    + ((i == 4 || i == 5) ? "observer" : "participant")
-                    + ";localhost:" + qu.getPeer(i).peer.getClientPort();
-            newServers.add(server);
-        }
-
-        qu.shutdown(3);
-        qu.shutdown(6);
-        qu.shutdown(7);
-        
-        reconfig(zkArr[1], null, null, newServers, -1);
-        testNormalOperation(zkArr[1], zkArr[2]);
-       
-        testServerHasConfig(zkArr[1], newServers, null);
-        testServerHasConfig(zkArr[2], newServers, null);
-        testServerHasConfig(zkArr[4], newServers, null);
-        testServerHasConfig(zkArr[5], newServers, null);
-    
-        qu.shutdown(5);
-        qu.shutdown(4);
-        
-        testNormalOperation(zkArr[1], zkArr[2]);
-
-        closeAllHandles(zkArr);
-    }
-
-    @Test
-    public void testRemoveOneAsynchronous() throws Exception {
-        qu = new QuorumUtil(2); 
-        qu.disableJMXTest = true;
-        qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-
-        List<String> leavingServers = new ArrayList<String>();
-       
-        // lets remove someone who's not the leader
-        leavingServers.add(getLeaderId(qu) == 5 ? "4": "5");
- 
-        LinkedList<Integer> results = new LinkedList<Integer>();
-        
-        zkArr[1].reconfig(null, leavingServers, null, -1, this, results);   
-        
-        synchronized (results) {
-            while (results.size() < 1) {
-               results.wait();
-            }
-        }        
-        Assert.assertEquals(0, (int) results.get(0));
-        
-        testNormalOperation(zkArr[1], zkArr[2]);       
-        for (int i=1; i<=5; i++)
-            testServerHasConfig(zkArr[i], null, leavingServers);
-
-        closeAllHandles(zkArr);
-    }
-
-    @SuppressWarnings("unchecked")
-    public void processResult(int rc, String path, Object ctx, byte[] data,
-            Stat stat) {
-        synchronized(ctx) {
-            ((LinkedList<Integer>)ctx).add(rc);
-            ctx.notifyAll();
-        }
-    }
-    
-    
-    @Test
-    public void testRoleChange() throws Exception {
-        qu = new QuorumUtil(1); // create 3 servers
-        qu.disableJMXTest = true;
-        qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-
-        // changing a server's role / port is done by "adding" it with the same
-        // id but different role / port
-        List<String> joiningServers = new ArrayList<String>();
-
-        int leaderIndex = getLeaderId(qu);
-
-        // during first and second iteration, leavingIndex will correspond to a
-        // follower
-        // during third and fouth iteration leavingIndex will be the index of
-        // the leader
-        int changingIndex = (leaderIndex == 1) ? 2 : 1;
-
-        // first convert participant to observer, then observer to participant,
-        // and so on
-        String newRole = "observer";
-
-        for (int i = 0; i < 4; i++) {
-            // some of the operations will be executed by a client connected to
-            // the removed server
-            // while others are invoked by a client connected to some other
-            // server.
-            // when we're removing the leader, zk1 will be the client connected
-            // to removed server
-            ZooKeeper zk1 = (changingIndex == leaderIndex) ? zkArr[leaderIndex]
-                    : zkArr[(leaderIndex % qu.ALL) + 1];
-
-            // exactly as it is now, except for role change
-            joiningServers.add("server."
-                    + changingIndex
-                    + "=localhost:"
-                    + qu.getPeer(changingIndex).peer.getQuorumAddress()
-                            .getPort()
-                    + ":"
-                    + qu.getPeer(changingIndex).peer.getElectionAddress()
-                            .getPort() + ":" + newRole + ";localhost:"
-                    + qu.getPeer(changingIndex).peer.getClientPort());
-
-            reconfig(zk1, joiningServers, null, null, -1);
-            testNormalOperation(zkArr[changingIndex], zk1);
-
-            if (newRole.equals("observer")) {
-                Assert.assertTrue(qu.getPeer(changingIndex).peer.observer != null
-                        && qu.getPeer(changingIndex).peer.follower == null
-                        && qu.getPeer(changingIndex).peer.leader == null);
-                Assert.assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.OBSERVING);
-            } else {
-                Assert.assertTrue(qu.getPeer(changingIndex).peer.observer == null
-                        && (qu.getPeer(changingIndex).peer.follower != null || qu
-                                .getPeer(changingIndex).peer.leader != null));
-                Assert.assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.FOLLOWING
-                        || qu.getPeer(changingIndex).peer.getPeerState() == ServerState.LEADING);
-            }
-
-            joiningServers.clear();
-
-            if (newRole.equals("observer")) {
-                newRole = "participant";
-            } else {
-                // lets change leader to observer
-                newRole = "observer";
-                leaderIndex = getLeaderId(qu);
-                changingIndex = leaderIndex;
-            }
-        }
-        closeAllHandles(zkArr);
-    }
-
-    @Test
-    public void testPortChange() throws Exception {
-        qu = new QuorumUtil(1); // create 3 servers
-        qu.disableJMXTest = true;
-        qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-
-        List<String> joiningServers = new ArrayList<String>();
-
-        int leaderIndex = getLeaderId(qu);
-        int followerIndex = leaderIndex == 1 ? 2 : 1;
-
-        // change leader into observer, and modify all its ports at the same
-        // time
-        int observerIndex = leaderIndex;
-
-        // new ports
-        int port1 = PortAssignment.unique();
-        int port2 = PortAssignment.unique();
-        int port3 = PortAssignment.unique();
-        joiningServers.add("server." + observerIndex + "=localhost:" + port1
-                + ":" + port2 + ":observer;localhost:" + port3);
-
-        // create a /test znode and check that read/write works before
-        // any reconfig is invoked
-        testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
-
-        reconfig(zkArr[followerIndex], joiningServers, null, null, -1);
-
-        // the change of port may not be immediate -- we repeatedly
-        // invoke an operation expecting it to eventually fail once
-        // the client port of observerIndex changes. If it didn't
-        // change -- that's an error.
-        try {
-          for (int i=0; i < 30; i++) {
-            Thread.sleep(1000);
-            zkArr[observerIndex].setData("/test", "teststr".getBytes(), -1);
-          }
-          Assert.fail("client port didn't change");
-        } catch (KeeperException.ConnectionLossException e) {
-            zkArr[observerIndex] = new ZooKeeper("127.0.0.1:"
-                    + qu.getPeer(observerIndex).peer.getClientPort(),
-                    ClientBase.CONNECTION_TIMEOUT, new Watcher() {
-                        public void process(WatchedEvent event) {}});
-        }
-
-        leaderIndex = getLeaderId(qu);
-
-        followerIndex = 1;
-        while (followerIndex == leaderIndex || followerIndex == observerIndex)
-            followerIndex++;
-
-        testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
-
-        testServerHasConfig(zkArr[observerIndex], joiningServers, null);
-
-        Assert.assertTrue(qu.getPeer(observerIndex).peer.getQuorumAddress()
-                .getPort() == port1);
-        Assert.assertTrue(qu.getPeer(observerIndex).peer.getElectionAddress()
-                .getPort() == port2);
-        Assert.assertTrue(qu.getPeer(observerIndex).peer.getClientPort() == port3);
-        Assert.assertTrue(qu.getPeer(observerIndex).peer.getPeerState() == ServerState.OBSERVING);
-
-        joiningServers.clear();
-
-        // change leader's leading port - should renounce leadership
-
-        port1 = PortAssignment.unique();
-        joiningServers.add("server." + leaderIndex + "=localhost:" + port1
-                + ":"
-                + qu.getPeer(leaderIndex).peer.getElectionAddress().getPort()
-                + ":participant;localhost:"
-                + qu.getPeer(leaderIndex).peer.getClientPort());
-
-        reconfig(zkArr[followerIndex], joiningServers, null, null, -1);
-
-        testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
-
-        Assert.assertTrue(qu.getPeer(leaderIndex).peer.getQuorumAddress()
-                .getPort() == port1);
-        Assert.assertTrue(qu.getPeer(leaderIndex).peer.leader == null
-                && qu.getPeer(leaderIndex).peer.follower != null);
-        Assert.assertTrue(qu.getPeer(followerIndex).peer.leader != null
-                && qu.getPeer(followerIndex).peer.follower == null);
-
-        joiningServers.clear();
-
-        // change in leader election port
-
-        for (int i = 1; i <= 3; i++) {
-            joiningServers.add("server." + i + "=localhost:"
-                    + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
-                    + PortAssignment.unique() + ":participant;localhost:"
-                    + qu.getPeer(i).peer.getClientPort());
-        }
-
-        reconfig(zkArr[1], joiningServers, null, null, -1);
-
-        leaderIndex = getLeaderId(qu);
-        int follower1 = leaderIndex == 1 ? 2 : 1;
-        int follower2 = 1;
-        while (follower2 == leaderIndex || follower2 == follower1)
-            follower2++;
-
-        // lets kill the leader and see if a new one is elected
-
-        qu.shutdown(getLeaderId(qu));
-
-        testNormalOperation(zkArr[follower2], zkArr[follower1]);
-        testServerHasConfig(zkArr[follower1], joiningServers, null);
-        testServerHasConfig(zkArr[follower2], joiningServers, null);
-
-        closeAllHandles(zkArr);
-    }
-
-    @Test
-    public void testUnspecifiedClientAddress() throws Exception {
-    	int[] ports = new int[3];
-    	for (int port : ports) {
-    		port = PortAssignment.unique();
-    	}
-    	String server = "server.0=localhost:" + ports[0] + ":" + ports[1] + ";" + ports[2];
-    	QuorumServer qs = new QuorumServer(0, server);
-    	Assert.assertEquals(qs.clientAddr.getHostName(), "0.0.0.0");
-    	Assert.assertEquals(qs.clientAddr.getPort(), ports[2]);
-    }
-    
-    @Test
-    public void testQuorumSystemChange() throws Exception {
-        qu = new QuorumUtil(3); // create 7 servers
-        qu.disableJMXTest = true;
-        qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-
-        ArrayList<String> members = new ArrayList<String>();
-        members.add("group.1=3:4:5");
-        members.add("group.2=1:2");
-        members.add("weight.1=0");
-        members.add("weight.2=0");
-        members.add("weight.3=1");
-        members.add("weight.4=1");
-        members.add("weight.5=1");
-
-        for (int i = 1; i <= 5; i++) {
-            members.add("server." + i + "=127.0.0.1:"
-                    + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
-                    + qu.getPeer(i).peer.getElectionAddress().getPort() + ";"
-                    + "127.0.0.1:" + qu.getPeer(i).peer.getClientPort());
-        }
-
-        reconfig(zkArr[1], null, null, members, -1);
-
-        // this should flush the config to servers 2, 3, 4 and 5
-        testNormalOperation(zkArr[2], zkArr[3]);
-        testNormalOperation(zkArr[4], zkArr[5]);
-
-        for (int i = 1; i <= 5; i++) {
-            if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumHierarchical))
-                Assert.fail("peer " + i
-                        + " doesn't think the quorum system is Hieararchical!");
-        }
-
-        qu.shutdown(1);
-        qu.shutdown(2);
-        qu.shutdown(3);
-        qu.shutdown(7);
-        qu.shutdown(6);
-
-        // servers 4 and 5 should be able to work independently
-        testNormalOperation(zkArr[4], zkArr[5]);
-
-        qu.restart(1);
-        qu.restart(2);
-
-        members.clear();
-        for (int i = 1; i <= 3; i++) {
-            members.add("server." + i + "=127.0.0.1:"
-                    + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
-                    + qu.getPeer(i).peer.getElectionAddress().getPort() + ";"
-                    + "127.0.0.1:" + qu.getPeer(i).peer.getClientPort());
-        }
-
-        reconfig(zkArr[1], null, null, members, -1);
-
-        // flush the config to server 2
-        testNormalOperation(zkArr[1], zkArr[2]);
-
-        qu.shutdown(4);
-        qu.shutdown(5);
-
-        // servers 1 and 2 should be able to work independently
-        testNormalOperation(zkArr[1], zkArr[2]);
-
-        for (int i = 1; i <= 2; i++) {
-            if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumMaj))
-                Assert.fail("peer "
-                        + i
-                        + " doesn't think the quorum system is a majority quorum system!");
-        }
-
-        closeAllHandles(zkArr);
-    }
-    
-    @Test
-    public void testInitialConfigHasPositiveVersion() throws Exception {
-        qu = new QuorumUtil(1); // create 3 servers
-        qu.disableJMXTest = true;
-        qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        testNormalOperation(zkArr[1], zkArr[2]);
-        for (int i=1; i<4; i++) {
-            String configStr = testServerHasConfig(zkArr[i], null, null);
-            QuorumVerifier qv = qu.getPeer(i).peer.configFromString(configStr);
-            long version = qv.getVersion();
-            Assert.assertTrue(version == 0x100000000L);
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReconfigTest extends ZKTestCase implements DataCallback{
+    private static final Logger LOG = LoggerFactory
+            .getLogger(ReconfigTest.class);
+
+    private QuorumUtil qu;
+
+    @After
+    public void tearDown() throws Exception {
+        if (qu != null) {
+            qu.tearDown();
+        }
+    }
+
+    public static String reconfig(ZooKeeper zk, List<String> joiningServers,
+            List<String> leavingServers, List<String> newMembers, long fromConfig)
+            throws KeeperException, InterruptedException {
+        byte[] config = null;
+        for (int j = 0; j < 30; j++) {
+            try {
+                config = zk.reconfig(joiningServers, leavingServers,
+                        newMembers, fromConfig, new Stat());
+                break;
+            } catch (KeeperException.ConnectionLossException e) {
+                if (j < 29) {
+                    Thread.sleep(1000);
+                } else {
+                    // test fails if we still can't connect to the quorum after
+                    // 30 seconds.
+                    Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
+                }
+            }
+        }
+
+        String configStr = new String(config);
+        if (joiningServers != null) {
+            for (String joiner : joiningServers)
+                Assert.assertTrue(configStr.contains(joiner));
+        }
+        if (leavingServers != null) {
+            for (String leaving : leavingServers)
+                Assert.assertFalse(configStr.contains("server.".concat(leaving)));
+        }
+
+        return configStr;
+    }
+
+    public static String testServerHasConfig(ZooKeeper zk,
+            List<String> joiningServers, List<String> leavingServers)
+            throws KeeperException, InterruptedException {
+        byte[] config = null;
+        for (int j = 0; j < 30; j++) {
+            try {
+                zk.sync("/", null, null);
+                config = zk.getConfig(false, new Stat());
+                break;
+            } catch (KeeperException.ConnectionLossException e) {
+                if (j < 29) {
+                    Thread.sleep(1000);
+                } else {
+                    // test fails if we still can't connect to the quorum after
+                    // 30 seconds.
+                    Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
+                }
+            }
+
+        }
+        String configStr = new String(config);
+        if (joiningServers != null) {
+            for (String joiner : joiningServers) {
+               Assert.assertTrue(configStr.contains(joiner));
+            }
+        }
+        if (leavingServers != null) {
+            for (String leaving : leavingServers)
+                Assert.assertFalse(configStr.contains("server.".concat(leaving)));
+        }
+
+        return configStr;
+    }
+    
+    public static void testNormalOperation(ZooKeeper writer, ZooKeeper reader)
+            throws KeeperException, InterruptedException {
+        boolean testNodeExists = false;
+       
+       for (int j = 0; j < 30; j++) {
+            try {
+               if (!testNodeExists) {
+                   try{ 
+                       writer.create("/test", "test".getBytes(),
+                           ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                   } catch (KeeperException.NodeExistsException e) {                       
+                   }
+                   testNodeExists = true;
+               }
+                String data = "test" + j;
+                writer.setData("/test", data.getBytes(), -1);
+                reader.sync("/", null, null);
+                byte[] res = reader.getData("/test", null, new Stat());
+                Assert.assertEquals(data, new String(res));
+                break;
+            } catch (KeeperException.ConnectionLossException e) {
+                if (j < 29) {
+                    Thread.sleep(1000);
+                } else {
+                    // test fails if we still can't connect to the quorum after
+                    // 30 seconds.
+                    Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
+                }
+            }
+
+        }
+
+    }    
+    
+    private int getLeaderId(QuorumUtil qu) {
+        int leaderId = 1;
+        while (qu.getPeer(leaderId).peer.leader == null)
+            leaderId++;
+        return leaderId;
+    }
+
+    private ZooKeeper[] createHandles(QuorumUtil qu) throws IOException {
+        // create an extra handle, so we can index the handles from 1 to qu.ALL
+        // using the server id.
+        ZooKeeper[] zkArr = new ZooKeeper[qu.ALL + 1];
+        zkArr[0] = null; // not used.
+        for (int i = 1; i <= qu.ALL; i++) {
+            // server ids are 1, 2 and 3
+            zkArr[i] = new ZooKeeper("127.0.0.1:"
+                    + qu.getPeer(i).peer.getClientPort(),
+                    ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+                        public void process(WatchedEvent event) {
+                        }});
+        }
+        return zkArr;
+    }
+
+    private void closeAllHandles(ZooKeeper[] zkArr) throws InterruptedException {
+        for (ZooKeeper zk : zkArr)
+            if (zk != null)
+                zk.close();
+    }
+
+ 
+    @Test
+    public void testRemoveAddOne() throws Exception {
+        qu = new QuorumUtil(1); // create 3 servers
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+
+        List<String> leavingServers = new ArrayList<String>();
+        List<String> joiningServers = new ArrayList<String>();
+
+        int leaderIndex = getLeaderId(qu);
+
+        // during first iteration, leavingIndex will correspond to a follower
+        // during second iteration leavingIndex will be the index of the leader
+        int leavingIndex = (leaderIndex == 1) ? 2 : 1;
+
+        for (int i = 0; i < 2; i++) {
+            // some of the operations will be executed by a client connected to
+            // the removed server
+            // while others are invoked by a client connected to some other
+            // server.
+            // when we're removing the leader, zk1 will be the client connected
+            // to removed server
+            ZooKeeper zk1 = (leavingIndex == leaderIndex) ? zkArr[leaderIndex]
+                    : zkArr[(leaderIndex % qu.ALL) + 1];
+            ZooKeeper zk2 = (leavingIndex == leaderIndex) ? zkArr[(leaderIndex % qu.ALL) + 1]
+                    : zkArr[leaderIndex];
+
+            leavingServers.add(Integer.toString(leavingIndex));
+
+            // remember this server so we can add it back later
+            joiningServers.add("server."
+                    + leavingIndex
+                    + "=localhost:"
+                    + qu.getPeer(leavingIndex).peer.getQuorumAddress()
+                            .getPort()
+                    + ":"
+                    + qu.getPeer(leavingIndex).peer.getElectionAddress()
+                            .getPort() + ":participant;localhost:"
+                    + qu.getPeer(leavingIndex).peer.getClientPort());
+
+            String configStr = reconfig(zk1, null, leavingServers, null, -1);
+            testServerHasConfig(zk2, null, leavingServers);
+            testNormalOperation(zk2, zk1);
+
+            QuorumVerifier qv = qu.getPeer(1).peer.configFromString(configStr);
+            long version = qv.getVersion();
+
+            // checks that conditioning on version works properly
+            try {
+                reconfig(zk2, joiningServers, null, null, version + 1);
+                Assert.fail("reconfig succeeded even though version condition was incorrect!");
+            } catch (KeeperException.BadVersionException e) {
+
+            }
+
+            reconfig(zk2, joiningServers, null, null, version);
+
+            testNormalOperation(zk1, zk2);
+            testServerHasConfig(zk1, joiningServers, null);
+
+            // second iteration of the loop will remove the leader
+            // and add it back (as follower)
+            leavingIndex = leaderIndex = getLeaderId(qu);
+            leavingServers.clear();
+            joiningServers.clear();
+        }
+
+        closeAllHandles(zkArr);
+    }
+
+    /**
+     * 1. removes and adds back two servers (incl leader). One of the servers is added back as observer
+     * 2. tests that reconfig fails if quorum of new config is not up
+     * 3. tests that a server that's not up during reconfig learns the new config when it comes up
+     * @throws Exception
+     */
+    @Test
+    public void testRemoveAddTwo() throws Exception {
+        qu = new QuorumUtil(2); // create 5 servers
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+
+        List<String> leavingServers = new ArrayList<String>();
+        List<String> joiningServers = new ArrayList<String>();
+
+        int leaderIndex = getLeaderId(qu);
+
+        // lets remove the leader and some other server
+        int leavingIndex1 = leaderIndex;
+        int leavingIndex2 = (leaderIndex == 1) ? 2 : 1;
+
+        // find some server that's staying
+        int stayingIndex1 = 1, stayingIndex2 = 1, stayingIndex3 = 1;
+        while (stayingIndex1 == leavingIndex1 || stayingIndex1 == leavingIndex2)
+            stayingIndex1++;
+
+        while (stayingIndex2 == leavingIndex1 || stayingIndex2 == leavingIndex2
+                || stayingIndex2 == stayingIndex1)
+            stayingIndex2++;
+
+        while (stayingIndex3 == leavingIndex1 || stayingIndex3 == leavingIndex2
+                || stayingIndex3 == stayingIndex1
+                || stayingIndex3 == stayingIndex2)
+            stayingIndex3++;
+
+        leavingServers.add(Integer.toString(leavingIndex1));
+        leavingServers.add(Integer.toString(leavingIndex2));
+
+        // remember these servers so we can add them back later
+        joiningServers.add("server." + leavingIndex1 + "=localhost:"
+                + qu.getPeer(leavingIndex1).peer.getQuorumAddress().getPort()
+                + ":"
+                + qu.getPeer(leavingIndex1).peer.getElectionAddress().getPort()
+                + ":participant;localhost:"
+                + qu.getPeer(leavingIndex1).peer.getClientPort());
+
+        // this server will be added back as an observer
+        joiningServers.add("server." + leavingIndex2 + "=localhost:"
+                + qu.getPeer(leavingIndex2).peer.getQuorumAddress().getPort()
+                + ":"
+                + qu.getPeer(leavingIndex2).peer.getElectionAddress().getPort()
+                + ":observer;localhost:"
+                + qu.getPeer(leavingIndex2).peer.getClientPort());
+
+        qu.shutdown(leavingIndex1);
+        qu.shutdown(leavingIndex2);
+
+        // 3 servers still up so this should work
+        reconfig(zkArr[stayingIndex2], null, leavingServers, null, -1);
+        
+        qu.shutdown(stayingIndex2);
+
+        // the following commands would not work in the original
+        // cluster of 5, but now that we've removed 2 servers
+        // we have a cluster of 3 servers and one of them is allowed to fail
+
+        testServerHasConfig(zkArr[stayingIndex1], null, leavingServers);
+        testServerHasConfig(zkArr[stayingIndex3], null, leavingServers);
+        testNormalOperation(zkArr[stayingIndex1], zkArr[stayingIndex3]);
+        
+        // this is a test that a reconfig will only succeed
+        // if there is a quorum up in new config. Below there is no
+        // quorum so it should fail
+        
+        // the sleep is necessary so that the leader figures out
+        // that the switched off servers are down
+        Thread.sleep(10000);
+
+        try {
+            reconfig(zkArr[stayingIndex1], joiningServers, null, null, -1);
+            Assert.fail("reconfig completed successfully even though there is no quorum up in new config!");
+        } catch (KeeperException.NewConfigNoQuorum e) {
+
+        }
+        
+        // now start the third server so that new config has quorum
+        qu.restart(stayingIndex2);
+
+        reconfig(zkArr[stayingIndex1], joiningServers, null, null, -1);
+        testNormalOperation(zkArr[stayingIndex2], zkArr[stayingIndex3]);
+        testServerHasConfig(zkArr[stayingIndex2], joiningServers, null);
+
+        // this server wasn't around during the configuration change
+        // we should check that it is able to connect, finds out
+        // about the change and becomes an observer.
+
+        qu.restart(leavingIndex2);
+        Assert.assertTrue(qu.getPeer(leavingIndex2).peer.getPeerState() == ServerState.OBSERVING);
+        testNormalOperation(zkArr[stayingIndex2], zkArr[leavingIndex2]);
+        testServerHasConfig(zkArr[leavingIndex2], joiningServers, null);
+
+        closeAllHandles(zkArr);
+    }
+
+    @Test
+    public void testBulkReconfig() throws Exception {
+        qu = new QuorumUtil(3); // create 7 servers
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+
+        // new config will have three of the servers as followers
+        // two of the servers as observers, and all ports different
+        ArrayList<String> newServers = new ArrayList<String>();
+        for (int i = 1; i <= 5; i++) {
+            String server = "server." + i + "=localhost:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":"
+                    + ((i == 4 || i == 5) ? "observer" : "participant")
+                    + ";localhost:" + qu.getPeer(i).peer.getClientPort();
+            newServers.add(server);
+        }
+
+        qu.shutdown(3);
+        qu.shutdown(6);
+        qu.shutdown(7);
+        
+        reconfig(zkArr[1], null, null, newServers, -1);
+        testNormalOperation(zkArr[1], zkArr[2]);
+       
+        testServerHasConfig(zkArr[1], newServers, null);
+        testServerHasConfig(zkArr[2], newServers, null);
+        testServerHasConfig(zkArr[4], newServers, null);
+        testServerHasConfig(zkArr[5], newServers, null);
+    
+        qu.shutdown(5);
+        qu.shutdown(4);
+        
+        testNormalOperation(zkArr[1], zkArr[2]);
+
+        closeAllHandles(zkArr);
+    }
+
+    @Test
+    public void testRemoveOneAsynchronous() throws Exception {
+        qu = new QuorumUtil(2); 
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+
+        List<String> leavingServers = new ArrayList<String>();
+       
+        // lets remove someone who's not the leader
+        leavingServers.add(getLeaderId(qu) == 5 ? "4": "5");
+ 
+        LinkedList<Integer> results = new LinkedList<Integer>();
+        
+        zkArr[1].reconfig(null, leavingServers, null, -1, this, results);   
+        
+        synchronized (results) {
+            while (results.size() < 1) {
+               results.wait();
+            }
+        }        
+        Assert.assertEquals(0, (int) results.get(0));
+        
+        testNormalOperation(zkArr[1], zkArr[2]);       
+        for (int i=1; i<=5; i++)
+            testServerHasConfig(zkArr[i], null, leavingServers);
+
+        closeAllHandles(zkArr);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void processResult(int rc, String path, Object ctx, byte[] data,
+            Stat stat) {
+        synchronized(ctx) {
+            ((LinkedList<Integer>)ctx).add(rc);
+            ctx.notifyAll();
+        }
+    }
+    
+    
+    @Test
+    public void testRoleChange() throws Exception {
+        qu = new QuorumUtil(1); // create 3 servers
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+
+        // changing a server's role / port is done by "adding" it with the same
+        // id but different role / port
+        List<String> joiningServers = new ArrayList<String>();
+
+        int leaderIndex = getLeaderId(qu);
+
+        // during first and second iteration, leavingIndex will correspond to a
+        // follower
+        // during third and fouth iteration leavingIndex will be the index of
+        // the leader
+        int changingIndex = (leaderIndex == 1) ? 2 : 1;
+
+        // first convert participant to observer, then observer to participant,
+        // and so on
+        String newRole = "observer";
+
+        for (int i = 0; i < 4; i++) {
+            // some of the operations will be executed by a client connected to
+            // the removed server
+            // while others are invoked by a client connected to some other
+            // server.
+            // when we're removing the leader, zk1 will be the client connected
+            // to removed server
+            ZooKeeper zk1 = (changingIndex == leaderIndex) ? zkArr[leaderIndex]
+                    : zkArr[(leaderIndex % qu.ALL) + 1];
+
+            // exactly as it is now, except for role change
+            joiningServers.add("server."
+                    + changingIndex
+                    + "=localhost:"
+                    + qu.getPeer(changingIndex).peer.getQuorumAddress()
+                            .getPort()
+                    + ":"
+                    + qu.getPeer(changingIndex).peer.getElectionAddress()
+                            .getPort() + ":" + newRole + ";localhost:"
+                    + qu.getPeer(changingIndex).peer.getClientPort());
+
+            reconfig(zk1, joiningServers, null, null, -1);
+            testNormalOperation(zkArr[changingIndex], zk1);
+
+            if (newRole.equals("observer")) {
+                Assert.assertTrue(qu.getPeer(changingIndex).peer.observer != null
+                        && qu.getPeer(changingIndex).peer.follower == null
+                        && qu.getPeer(changingIndex).peer.leader == null);
+                Assert.assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.OBSERVING);
+            } else {
+                Assert.assertTrue(qu.getPeer(changingIndex).peer.observer == null
+                        && (qu.getPeer(changingIndex).peer.follower != null || qu
+                                .getPeer(changingIndex).peer.leader != null));
+                Assert.assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.FOLLOWING
+                        || qu.getPeer(changingIndex).peer.getPeerState() == ServerState.LEADING);
+            }
+
+            joiningServers.clear();
+
+            if (newRole.equals("observer")) {
+                newRole = "participant";
+            } else {
+                // lets change leader to observer
+                newRole = "observer";
+                leaderIndex = getLeaderId(qu);
+                changingIndex = leaderIndex;
+            }
+        }
+        closeAllHandles(zkArr);
+    }
+
+    @Test
+    public void testPortChange() throws Exception {
+        qu = new QuorumUtil(1); // create 3 servers
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+
+        List<String> joiningServers = new ArrayList<String>();
+
+        int leaderIndex = getLeaderId(qu);
+        int followerIndex = leaderIndex == 1 ? 2 : 1;
+
+        // change leader into observer, and modify all its ports at the same
+        // time
+        int observerIndex = leaderIndex;
+
+        // new ports
+        int port1 = PortAssignment.unique();
+        int port2 = PortAssignment.unique();
+        int port3 = PortAssignment.unique();
+        joiningServers.add("server." + observerIndex + "=localhost:" + port1
+                + ":" + port2 + ":observer;localhost:" + port3);
+
+        // create a /test znode and check that read/write works before
+        // any reconfig is invoked
+        testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
+
+        reconfig(zkArr[followerIndex], joiningServers, null, null, -1);
+
+        // the change of port may not be immediate -- we repeatedly
+        // invoke an operation expecting it to eventually fail once
+        // the client port of observerIndex changes. If it didn't
+        // change -- that's an error.
+        try {
+          for (int i=0; i < 30; i++) {
+            Thread.sleep(1000);
+            zkArr[observerIndex].setData("/test", "teststr".getBytes(), -1);
+          }
+          Assert.fail("client port didn't change");
+        } catch (KeeperException.ConnectionLossException e) {
+            zkArr[observerIndex] = new ZooKeeper("127.0.0.1:"
+                    + qu.getPeer(observerIndex).peer.getClientPort(),
+                    ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+                        public void process(WatchedEvent event) {}});
+        }
+
+        leaderIndex = getLeaderId(qu);
+
+        followerIndex = 1;
+        while (followerIndex == leaderIndex || followerIndex == observerIndex)
+            followerIndex++;
+
+        testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
+
+        testServerHasConfig(zkArr[observerIndex], joiningServers, null);
+
+        Assert.assertTrue(qu.getPeer(observerIndex).peer.getQuorumAddress()
+                .getPort() == port1);
+        Assert.assertTrue(qu.getPeer(observerIndex).peer.getElectionAddress()
+                .getPort() == port2);
+        Assert.assertTrue(qu.getPeer(observerIndex).peer.getClientPort() == port3);
+        Assert.assertTrue(qu.getPeer(observerIndex).peer.getPeerState() == ServerState.OBSERVING);
+
+        joiningServers.clear();
+
+        // change leader's leading port - should renounce leadership
+
+        port1 = PortAssignment.unique();
+        joiningServers.add("server." + leaderIndex + "=localhost:" + port1
+                + ":"
+                + qu.getPeer(leaderIndex).peer.getElectionAddress().getPort()
+                + ":participant;localhost:"
+                + qu.getPeer(leaderIndex).peer.getClientPort());
+
+        reconfig(zkArr[followerIndex], joiningServers, null, null, -1);
+
+        testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
+
+        Assert.assertTrue(qu.getPeer(leaderIndex).peer.getQuorumAddress()
+                .getPort() == port1);
+        Assert.assertTrue(qu.getPeer(leaderIndex).peer.leader == null
+                && qu.getPeer(leaderIndex).peer.follower != null);
+        Assert.assertTrue(qu.getPeer(followerIndex).peer.leader != null
+                && qu.getPeer(followerIndex).peer.follower == null);
+
+        joiningServers.clear();
+
+        // change in leader election port
+
+        for (int i = 1; i <= 3; i++) {
+            joiningServers.add("server." + i + "=localhost:"
+                    + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
+                    + PortAssignment.unique() + ":participant;localhost:"
+                    + qu.getPeer(i).peer.getClientPort());
+        }
+
+        reconfig(zkArr[1], joiningServers, null, null, -1);
+
+        leaderIndex = getLeaderId(qu);
+        int follower1 = leaderIndex == 1 ? 2 : 1;
+        int follower2 = 1;
+        while (follower2 == leaderIndex || follower2 == follower1)
+            follower2++;
+
+        // lets kill the leader and see if a new one is elected
+
+        qu.shutdown(getLeaderId(qu));
+
+        testNormalOperation(zkArr[follower2], zkArr[follower1]);
+        testServerHasConfig(zkArr[follower1], joiningServers, null);
+        testServerHasConfig(zkArr[follower2], joiningServers, null);
+
+        closeAllHandles(zkArr);
+    }
+
+    @Test
+    public void testUnspecifiedClientAddress() throws Exception {
+    	int[] ports = new int[3];
+    	for (int port : ports) {
+    		port = PortAssignment.unique();
+    	}
+    	String server = "server.0=localhost:" + ports[0] + ":" + ports[1] + ";" + ports[2];
+    	QuorumServer qs = new QuorumServer(0, server);
+    	Assert.assertEquals(qs.clientAddr.getHostName(), "0.0.0.0");
+    	Assert.assertEquals(qs.clientAddr.getPort(), ports[2]);
+    }
+    
+    @Test
+    public void testQuorumSystemChange() throws Exception {
+        qu = new QuorumUtil(3); // create 7 servers
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+
+        ArrayList<String> members = new ArrayList<String>();
+        members.add("group.1=3:4:5");
+        members.add("group.2=1:2");
+        members.add("weight.1=0");
+        members.add("weight.2=0");
+        members.add("weight.3=1");
+        members.add("weight.4=1");
+        members.add("weight.5=1");
+
+        for (int i = 1; i <= 5; i++) {
+            members.add("server." + i + "=127.0.0.1:"
+                    + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
+                    + qu.getPeer(i).peer.getElectionAddress().getPort() + ";"
+                    + "127.0.0.1:" + qu.getPeer(i).peer.getClientPort());
+        }
+
+        reconfig(zkArr[1], null, null, members, -1);
+
+        // this should flush the config to servers 2, 3, 4 and 5
+        testNormalOperation(zkArr[2], zkArr[3]);
+        testNormalOperation(zkArr[4], zkArr[5]);
+
+        for (int i = 1; i <= 5; i++) {
+            if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumHierarchical))
+                Assert.fail("peer " + i
+                        + " doesn't think the quorum system is Hieararchical!");
+        }
+
+        qu.shutdown(1);
+        qu.shutdown(2);
+        qu.shutdown(3);
+        qu.shutdown(7);
+        qu.shutdown(6);
+
+        // servers 4 and 5 should be able to work independently
+        testNormalOperation(zkArr[4], zkArr[5]);
+
+        qu.restart(1);
+        qu.restart(2);
+
+        members.clear();
+        for (int i = 1; i <= 3; i++) {
+            members.add("server." + i + "=127.0.0.1:"
+                    + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
+                    + qu.getPeer(i).peer.getElectionAddress().getPort() + ";"
+                    + "127.0.0.1:" + qu.getPeer(i).peer.getClientPort());
+        }
+
+        reconfig(zkArr[1], null, null, members, -1);
+
+        // flush the config to server 2
+        testNormalOperation(zkArr[1], zkArr[2]);
+
+        qu.shutdown(4);
+        qu.shutdown(5);
+
+        // servers 1 and 2 should be able to work independently
+        testNormalOperation(zkArr[1], zkArr[2]);
+
+        for (int i = 1; i <= 2; i++) {
+            if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumMaj))
+                Assert.fail("peer "
+                        + i
+                        + " doesn't think the quorum system is a majority quorum system!");
+        }
+
+        closeAllHandles(zkArr);
+    }
+    
+    @Test
+    public void testInitialConfigHasPositiveVersion() throws Exception {
+        qu = new QuorumUtil(1); // create 3 servers
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+        testNormalOperation(zkArr[1], zkArr[2]);
+        for (int i=1; i<4; i++) {
+            String configStr = testServerHasConfig(zkArr[i], null, null);
+            QuorumVerifier qv = qu.getPeer(i).peer.configFromString(configStr);
+            long version = qv.getVersion();
+            Assert.assertTrue(version == 0x100000000L);
+        }
+    }
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReconfigTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionInvalidationTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionInvalidationTest.java?rev=1584497&r1=1584496&r2=1584497&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionInvalidationTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionInvalidationTest.java Fri Apr  4 01:24:37 2014
@@ -1,105 +1,105 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import junit.framework.Assert;
-
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.proto.RequestHeader;
-import org.junit.Test;
-
-public class SessionInvalidationTest extends ClientBase {
-    /**
-     * Test solution for ZOOKEEPER-1208. Verify that operations are not
-     * accepted after a close session.
-     * 
-     * We're using our own marshalling here in order to force an operation
-     * after the session is closed (ZooKeeper.class will not allow this). Also
-     * by filling the pipe with operations it increases the likelyhood that
-     * the server will process the create before FinalRequestProcessor
-     * removes the session from the tracker.
-     */
-    @Test
-    public void testCreateAfterCloseShouldFail() throws Exception {
-        for (int i = 0; i < 10; i++) {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-
-            // open a connection
-            boa.writeInt(44, "len");
-            ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16]);
-            conReq.serialize(boa, "connect");
-
-            // close connection
-            boa.writeInt(8, "len");
-            RequestHeader h = new RequestHeader(1, ZooDefs.OpCode.closeSession);
-            h.serialize(boa, "header");
-
-            // create ephemeral znode
-            boa.writeInt(52, "len"); // We'll fill this in later
-            RequestHeader header = new RequestHeader(2, OpCode.create);
-            header.serialize(boa, "header");
-            CreateRequest createReq = new CreateRequest("/foo" + i, new byte[0],
-                    Ids.OPEN_ACL_UNSAFE, 1);
-            createReq.serialize(boa, "request");
-            baos.close();
-            
-            System.out.println("Length:" + baos.toByteArray().length);
-            
-            String hp[] = hostPort.split(":");
-            Socket sock = new Socket(hp[0], Integer.parseInt(hp[1]));
-            InputStream resultStream = null;
-            try {
-                OutputStream outstream = sock.getOutputStream();
-                byte[] data = baos.toByteArray();
-                outstream.write(data);
-                outstream.flush();
-                
-                resultStream = sock.getInputStream();
-                byte[] b = new byte[10000];
-                int len;
-                while ((len = resultStream.read(b)) >= 0) {
-                    // got results
-                    System.out.println("gotlen:" + len);
-                }
-            } finally {
-                if (resultStream != null) {
-                    resultStream.close();
-                }
-                sock.close();
-            }
-        }
-        
-        ZooKeeper zk = createClient();
-        Assert.assertEquals(1, zk.getChildren("/", false).size());
-
-        zk.close();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import junit.framework.Assert;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.proto.ConnectRequest;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.junit.Test;
+
+public class SessionInvalidationTest extends ClientBase {
+    /**
+     * Test solution for ZOOKEEPER-1208. Verify that operations are not
+     * accepted after a close session.
+     * 
+     * We're using our own marshalling here in order to force an operation
+     * after the session is closed (ZooKeeper.class will not allow this). Also
+     * by filling the pipe with operations it increases the likelyhood that
+     * the server will process the create before FinalRequestProcessor
+     * removes the session from the tracker.
+     */
+    @Test
+    public void testCreateAfterCloseShouldFail() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+
+            // open a connection
+            boa.writeInt(44, "len");
+            ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16]);
+            conReq.serialize(boa, "connect");
+
+            // close connection
+            boa.writeInt(8, "len");
+            RequestHeader h = new RequestHeader(1, ZooDefs.OpCode.closeSession);
+            h.serialize(boa, "header");
+
+            // create ephemeral znode
+            boa.writeInt(52, "len"); // We'll fill this in later
+            RequestHeader header = new RequestHeader(2, OpCode.create);
+            header.serialize(boa, "header");
+            CreateRequest createReq = new CreateRequest("/foo" + i, new byte[0],
+                    Ids.OPEN_ACL_UNSAFE, 1);
+            createReq.serialize(boa, "request");
+            baos.close();
+            
+            System.out.println("Length:" + baos.toByteArray().length);
+            
+            String hp[] = hostPort.split(":");
+            Socket sock = new Socket(hp[0], Integer.parseInt(hp[1]));
+            InputStream resultStream = null;
+            try {
+                OutputStream outstream = sock.getOutputStream();
+                byte[] data = baos.toByteArray();
+                outstream.write(data);
+                outstream.flush();
+                
+                resultStream = sock.getInputStream();
+                byte[] b = new byte[10000];
+                int len;
+                while ((len = resultStream.read(b)) >= 0) {
+                    // got results
+                    System.out.println("gotlen:" + len);
+                }
+            } finally {
+                if (resultStream != null) {
+                    resultStream.close();
+                }
+                sock.close();
+            }
+        }
+        
+        ZooKeeper zk = createClient();
+        Assert.assertEquals(1, zk.getChildren("/", false).size());
+
+        zk.close();
+    }
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionInvalidationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message