zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [03/51] [partial] zookeeper git commit: ZOOKEEPER-3032: MAVEN MIGRATION - zookeeper-server
Date Fri, 19 Oct 2018 12:40:01 GMT
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
new file mode 100644
index 0000000..fcaa9b6
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.util.OSMXBean;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class QuorumBase extends ClientBase {
+    private static final Logger LOG = LoggerFactory.getLogger(QuorumBase.class);
+
+    private static final String LOCALADDR = "127.0.0.1";
+
+    File s1dir, s2dir, s3dir, s4dir, s5dir;
+    QuorumPeer s1, s2, s3, s4, s5;
+    protected int port1;
+    protected int port2;
+    protected int port3;
+    protected int port4;
+    protected int port5;
+
+    protected int portLE1;
+    protected int portLE2;
+    protected int portLE3;
+    protected int portLE4;
+    protected int portLE5;
+
+    protected int portClient1;
+    protected int portClient2;
+    protected int portClient3;
+    protected int portClient4;
+    protected int portClient5;
+
+    protected boolean localSessionsEnabled = false;
+    protected boolean localSessionsUpgradingEnabled = false;
+
+    @Test
+    // This just avoids complaints by junit
+    public void testNull() {
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        setUp(false);
+    }
+
+    protected void setUp(boolean withObservers) throws Exception {
+        LOG.info("QuorumBase.setup " + getTestName());
+        setupTestEnv();
+
+        JMXEnv.setUp();
+
+        setUpAll();
+
+        port1 = PortAssignment.unique();
+        port2 = PortAssignment.unique();
+        port3 = PortAssignment.unique();
+        port4 = PortAssignment.unique();
+        port5 = PortAssignment.unique();
+
+        portLE1 = PortAssignment.unique();
+        portLE2 = PortAssignment.unique();
+        portLE3 = PortAssignment.unique();
+        portLE4 = PortAssignment.unique();
+        portLE5 = PortAssignment.unique();
+
+        portClient1 = PortAssignment.unique();
+        portClient2 = PortAssignment.unique();
+        portClient3 = PortAssignment.unique();
+        portClient4 = PortAssignment.unique();
+        portClient5 = PortAssignment.unique();
+
+        hostPort = "127.0.0.1:" + portClient1
+            + ",127.0.0.1:" + portClient2
+            + ",127.0.0.1:" + portClient3
+            + ",127.0.0.1:" + portClient4
+            + ",127.0.0.1:" + portClient5;
+        LOG.info("Ports are: " + hostPort);
+
+        s1dir = ClientBase.createTmpDir();
+        s2dir = ClientBase.createTmpDir();
+        s3dir = ClientBase.createTmpDir();
+        s4dir = ClientBase.createTmpDir();
+        s5dir = ClientBase.createTmpDir();
+
+        startServers(withObservers);
+
+        OSMXBean osMbean = new OSMXBean();
+        if (osMbean.getUnix() == true) {
+            LOG.info("Initial fdcount is: "
+                    + osMbean.getOpenFileDescriptorCount());
+        }
+
+        LOG.info("Setup finished");
+    }
+
+    void startServers() throws Exception {
+        startServers(false);
+    }
+
+    void startServers(boolean withObservers) throws Exception {
+        int tickTime = 2000;
+        int initLimit = 3;
+        int syncLimit = 3;
+        Map<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+        peers.put(Long.valueOf(1), new QuorumServer(1,
+                new InetSocketAddress(LOCALADDR, port1),
+                new InetSocketAddress(LOCALADDR, portLE1),
+                new InetSocketAddress(LOCALADDR, portClient1),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(2), new QuorumServer(2,
+                new InetSocketAddress(LOCALADDR, port2),
+                new InetSocketAddress(LOCALADDR, portLE2),
+                new InetSocketAddress(LOCALADDR, portClient2),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(3), new QuorumServer(3,
+                new InetSocketAddress(LOCALADDR, port3),
+                new InetSocketAddress(LOCALADDR, portLE3),
+                new InetSocketAddress(LOCALADDR, portClient3),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(4), new QuorumServer(4,
+                new InetSocketAddress(LOCALADDR, port4),
+                new InetSocketAddress(LOCALADDR, portLE4),
+                new InetSocketAddress(LOCALADDR, portClient4),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(5), new QuorumServer(5,
+                new InetSocketAddress(LOCALADDR, port5),
+                new InetSocketAddress(LOCALADDR, portLE5),
+                new InetSocketAddress(LOCALADDR, portClient5),
+                LearnerType.PARTICIPANT));
+
+        if (withObservers) {
+            peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER;
+            peers.get(Long.valueOf(5)).type = LearnerType.OBSERVER;
+        }
+
+        LOG.info("creating QuorumPeer 1 port " + portClient1);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit);
+        Assert.assertEquals(portClient1, s1.getClientPort());
+        LOG.info("creating QuorumPeer 2 port " + portClient2);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit);
+        Assert.assertEquals(portClient2, s2.getClientPort());
+        LOG.info("creating QuorumPeer 3 port " + portClient3);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit);
+        Assert.assertEquals(portClient3, s3.getClientPort());
+        LOG.info("creating QuorumPeer 4 port " + portClient4);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit);
+        Assert.assertEquals(portClient4, s4.getClientPort());
+        LOG.info("creating QuorumPeer 5 port " + portClient5);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit);
+        Assert.assertEquals(portClient5, s5.getClientPort());
+
+        if (withObservers) {
+            s4.setLearnerType(LearnerType.OBSERVER);
+            s5.setLearnerType(LearnerType.OBSERVER);
+        }
+
+        LOG.info("QuorumPeer 1 voting view: " + s1.getVotingView());
+        LOG.info("QuorumPeer 2 voting view: " + s2.getVotingView());
+        LOG.info("QuorumPeer 3 voting view: " + s3.getVotingView());
+        LOG.info("QuorumPeer 4 voting view: " + s4.getVotingView());
+        LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView());
+
+        s1.enableLocalSessions(localSessionsEnabled);
+        s2.enableLocalSessions(localSessionsEnabled);
+        s3.enableLocalSessions(localSessionsEnabled);
+        s4.enableLocalSessions(localSessionsEnabled);
+        s5.enableLocalSessions(localSessionsEnabled);
+        s1.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s2.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s3.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s4.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s5.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+
+        LOG.info("start QuorumPeer 1");
+        s1.start();
+        LOG.info("start QuorumPeer 2");
+        s2.start();
+        LOG.info("start QuorumPeer 3");
+        s3.start();
+        LOG.info("start QuorumPeer 4");
+        s4.start();
+        LOG.info("start QuorumPeer 5");
+        s5.start();
+        LOG.info("started QuorumPeer 5");
+
+        LOG.info ("Checking ports " + hostPort);
+        for (String hp : hostPort.split(",")) {
+            Assert.assertTrue("waiting for server up",
+                       ClientBase.waitForServerUp(hp,
+                                    CONNECTION_TIMEOUT));
+            LOG.info(hp + " is accepting client connections");
+        }
+
+        // interesting to see what's there...
+        JMXEnv.dump();
+        // make sure we have these 5 servers listed
+        Set<String> ensureNames = new LinkedHashSet<String>();
+        for (int i = 1; i <= 5; i++) {
+            ensureNames.add("InMemoryDataTree");
+        }
+        for (int i = 1; i <= 5; i++) {
+            ensureNames.add("name0=ReplicatedServer_id" + i
+                 + ",name1=replica." + i + ",name2=");
+        }
+        for (int i = 1; i <= 5; i++) {
+            for (int j = 1; j <= 5; j++) {
+                ensureNames.add("name0=ReplicatedServer_id" + i
+                     + ",name1=replica." + j);
+            }
+        }
+        for (int i = 1; i <= 5; i++) {
+            ensureNames.add("name0=ReplicatedServer_id" + i);
+        }
+        JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
+    }
+
+    public int getLeaderIndex() {
+      if (s1.getPeerState() == ServerState.LEADING) {
+        return 0;
+      } else if (s2.getPeerState() == ServerState.LEADING) {
+        return 1;
+      } else if (s3.getPeerState() == ServerState.LEADING) {
+        return 2;
+      } else if (s4.getPeerState() == ServerState.LEADING) {
+        return 3;
+      } else if (s5.getPeerState() == ServerState.LEADING) {
+        return 4;
+      }
+      return -1;
+    }
+
+    public String getPeersMatching(ServerState state) {
+        StringBuilder hosts = new StringBuilder();
+        for (QuorumPeer p : getPeerList()) {
+            if (p.getPeerState() == state) {
+                hosts.append(String.format("%s:%d,", LOCALADDR, p.getClientAddress().getPort()));
+            }
+        }
+        LOG.info("getPeersMatching ports are {}", hosts);
+        return hosts.toString();
+    }
+
+    public ArrayList<QuorumPeer> getPeerList() {
+        ArrayList<QuorumPeer> peers = new ArrayList<QuorumPeer>();
+        peers.add(s1);
+        peers.add(s2);
+        peers.add(s3);
+        peers.add(s4);
+        peers.add(s5);
+        return peers;
+    }
+
+    public void setupServers() throws IOException {
+        setupServer(1);
+        setupServer(2);
+        setupServer(3);
+        setupServer(4);
+        setupServer(5);
+    }
+
+    Map<Long,QuorumServer> peers = null;
+    public void setupServer(int i) throws IOException {
+        int tickTime = 2000;
+        int initLimit = 3;
+        int syncLimit = 3;
+
+        if(peers == null){
+            peers = new HashMap<Long,QuorumServer>();
+
+            peers.put(Long.valueOf(1), new QuorumServer(1,
+                new InetSocketAddress(LOCALADDR, port1),
+                new InetSocketAddress(LOCALADDR, portLE1),
+                new InetSocketAddress(LOCALADDR, portClient1),
+                LearnerType.PARTICIPANT));
+            peers.put(Long.valueOf(2), new QuorumServer(2,
+                new InetSocketAddress(LOCALADDR, port2),
+                new InetSocketAddress(LOCALADDR, portLE2),
+                new InetSocketAddress(LOCALADDR, portClient2),
+                LearnerType.PARTICIPANT));
+            peers.put(Long.valueOf(3), new QuorumServer(3,
+                new InetSocketAddress(LOCALADDR, port3),
+                new InetSocketAddress(LOCALADDR, portLE3),
+                new InetSocketAddress(LOCALADDR, portClient3),
+                LearnerType.PARTICIPANT));
+            peers.put(Long.valueOf(4), new QuorumServer(4,
+                new InetSocketAddress(LOCALADDR, port4),
+                new InetSocketAddress(LOCALADDR, portLE4),
+                new InetSocketAddress(LOCALADDR, portClient4),
+                LearnerType.PARTICIPANT));
+            peers.put(Long.valueOf(5), new QuorumServer(5,
+                new InetSocketAddress(LOCALADDR, port5),
+                new InetSocketAddress(LOCALADDR, portLE5),
+                new InetSocketAddress(LOCALADDR, portClient5),
+                LearnerType.PARTICIPANT));
+        }
+
+        switch(i){
+        case 1:
+            LOG.info("creating QuorumPeer 1 port " + portClient1);
+            s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit);
+            Assert.assertEquals(portClient1, s1.getClientPort());
+            break;
+        case 2:
+            LOG.info("creating QuorumPeer 2 port " + portClient2);
+            s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit);
+            Assert.assertEquals(portClient2, s2.getClientPort());
+            break;
+        case 3:
+            LOG.info("creating QuorumPeer 3 port " + portClient3);
+            s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit);
+            Assert.assertEquals(portClient3, s3.getClientPort());
+            break;
+        case 4:
+            LOG.info("creating QuorumPeer 4 port " + portClient4);
+            s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit);
+            Assert.assertEquals(portClient4, s4.getClientPort());
+            break;
+        case 5:
+            LOG.info("creating QuorumPeer 5 port " + portClient5);
+            s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit);
+            Assert.assertEquals(portClient5, s5.getClientPort());
+        }
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        LOG.info("TearDown started");
+
+        OSMXBean osMbean = new OSMXBean();
+        if (osMbean.getUnix() == true) {
+            LOG.info("fdcount after test is: "
+                    + osMbean.getOpenFileDescriptorCount());
+        }
+
+        shutdownServers();
+
+        for (String hp : hostPort.split(",")) {
+            Assert.assertTrue("waiting for server down",
+                       ClientBase.waitForServerDown(hp,
+                                           ClientBase.CONNECTION_TIMEOUT));
+            LOG.info(hp + " is no longer accepting client connections");
+        }
+
+        JMXEnv.tearDown();
+    }
+    public void shutdownServers() {
+        shutdown(s1);
+        shutdown(s2);
+        shutdown(s3);
+        shutdown(s4);
+        shutdown(s5);
+    }
+
+    public static void shutdown(QuorumPeer qp) {
+        if (qp == null) {
+            return;
+        }
+        try {
+            LOG.info("Shutting down quorum peer " + qp.getName());
+            qp.shutdown();
+            Election e = qp.getElectionAlg();
+            if (e != null) {
+                LOG.info("Shutting down leader election " + qp.getName());
+                e.shutdown();
+            } else {
+                LOG.info("No election available to shutdown " + qp.getName());
+            }
+            LOG.info("Waiting for " + qp.getName() + " to exit thread");
+            long readTimeout = qp.getTickTime() * qp.getInitLimit();
+            long connectTimeout = qp.getTickTime() * qp.getSyncLimit();
+            long maxTimeout = Math.max(readTimeout, connectTimeout);
+            maxTimeout = Math.max(maxTimeout, ClientBase.CONNECTION_TIMEOUT);
+            qp.join(maxTimeout * 2);
+            if (qp.isAlive()) {
+                Assert.fail("QP failed to shutdown in " + (maxTimeout * 2) + " seconds: " + qp.getName());
+            }
+        } catch (InterruptedException e) {
+            LOG.debug("QP interrupted: " + qp.getName(), e);
+        }
+    }
+
+    protected TestableZooKeeper createClient()
+        throws IOException, InterruptedException
+    {
+        return createClient(hostPort);
+    }
+
+    protected TestableZooKeeper createClient(String hp)
+        throws IOException, InterruptedException
+    {
+        CountdownWatcher watcher = new CountdownWatcher();
+        return createClient(watcher, hp);
+    }
+
+    protected TestableZooKeeper createClient(CountdownWatcher watcher, ServerState state)
+        throws IOException, InterruptedException
+    {
+        return createClient(watcher, getPeersMatching(state));
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumHammerTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumHammerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumHammerTest.java
new file mode 100644
index 0000000..e5b377e
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumHammerTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuorumHammerTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(QuorumHammerTest.class);
+    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+    protected final QuorumBase qb = new QuorumBase();
+    protected final ClientHammerTest cht = new ClientHammerTest();
+
+    @Before
+    public void setUp() throws Exception {
+        qb.setUp();
+        cht.hostPort = qb.hostPort;
+        cht.setUpAll();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cht.tearDownAll();
+        qb.tearDown();
+    }
+
+    @Test
+    public void testHammerBasic() throws Throwable {
+        cht.testHammerBasic();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
new file mode 100644
index 0000000..6966626
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+import java.util.ArrayList;
+
+import org.apache.zookeeper.jmx.CommonNames;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuorumMajorityTest extends QuorumBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(QuorumMajorityTest.class);
+    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+    /***************************************************************/
+    /* Test that the majority quorum verifier only counts votes from */
+    /* followers in its view                                    */
+    /***************************************************************/
+    @Test
+    public void testMajQuorums() throws Throwable {
+        LOG.info("Verify QuorumPeer#electionTimeTaken jmx bean attribute");
+
+        ArrayList<QuorumPeer> peers = getPeerList();
+        for (int i = 1; i <= peers.size(); i++) {
+            QuorumPeer qp = peers.get(i - 1);
+            Long electionTimeTaken = -1L;
+            String bean = "";
+            if (qp.getPeerState() == ServerState.FOLLOWING) {
+                bean = String.format(
+                        "%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Follower",
+                        CommonNames.DOMAIN, i, i);
+            } else if (qp.getPeerState() == ServerState.LEADING) {
+                bean = String.format(
+                        "%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Leader",
+                        CommonNames.DOMAIN, i, i);
+            }
+            electionTimeTaken = (Long) JMXEnv.ensureBeanAttribute(bean,
+                    "ElectionTimeTaken");
+            Assert.assertTrue("Wrong electionTimeTaken value!",
+                    electionTimeTaken >= 0);
+        }
+
+       //setup servers 1-5 to be followers
+       setUp(false);
+        
+       Proposal p = new Proposal();
+       
+        p.addQuorumVerifier(s1.getQuorumVerifier());
+        
+        // 2 followers out of 5 is not a majority
+        p.addAck(Long.valueOf(1));
+        p.addAck(Long.valueOf(2));        
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 6 is not in the view - its vote shouldn't count
+        p.addAck(Long.valueOf(6));  
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 3 followers out of 5 are a majority of the voting view
+        p.addAck(Long.valueOf(3));  
+        Assert.assertEquals(true, p.hasAllQuorums());
+        
+       //setup servers 1-3 to be followers and 4 and 5 to be observers
+       setUp(true);
+       
+       p = new Proposal();
+       p.addQuorumVerifier(s1.getQuorumVerifier());
+        
+        // 1 follower out of 3 is not a majority
+       p.addAck(Long.valueOf(1));      
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 4 and 5 are observers, their vote shouldn't count
+        p.addAck(Long.valueOf(4));
+        p.addAck(Long.valueOf(5));
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 6 is not in the view - its vote shouldn't count
+        p.addAck(Long.valueOf(6));
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 2 followers out of 3 are a majority of the voting view
+        p.addAck(Long.valueOf(2));
+        Assert.assertEquals(true, p.hasAllQuorums());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java
new file mode 100644
index 0000000..415bb7d
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Quotas;
+import org.apache.zookeeper.StatsTrack;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeperMain;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QuorumQuotaTest extends QuorumBase {
+
+    @Test
+    public void testQuotaWithQuorum() throws Exception {
+        ZooKeeper zk = createClient();
+        zk.setData("/", "some".getBytes(), -1);
+        zk.create("/a", "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        int i = 0;
+        for (i=0; i < 300;i++) {
+            zk.create("/a/" + i, "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+        ZooKeeperMain.createQuota(zk, "/a", 1000L, 5000);
+        String statPath = Quotas.quotaZookeeper + "/a"+ "/" + Quotas.statNode;
+        byte[] data = zk.getData(statPath, false, new Stat());
+        StatsTrack st = new StatsTrack(new String(data));
+        Assert.assertTrue("bytes are set", st.getBytes() == 1204L);
+        Assert.assertTrue("num count is set", st.getCount() == 301);
+        for (i=300; i < 600; i++) {
+            zk.create("/a/" + i, "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+        data = zk.getData(statPath, false, new Stat());
+        st = new StatsTrack(new String(data));
+        Assert.assertTrue("bytes are set", st.getBytes() == 2404L);
+        Assert.assertTrue("num count is set", st.getCount() == 601);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
new file mode 100644
index 0000000..20388d8
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuorumTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(QuorumTest.class);
+    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+    private final QuorumBase qb = new QuorumBase();
+    private final ClientTest ct = new ClientTest();
+    private QuorumUtil qu;
+
+    @Before
+    public void setUp() throws Exception {
+        qb.setUp();
+        ct.hostPort = qb.hostPort;
+        ct.setUpAll();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        ct.tearDownAll();
+        qb.tearDown();
+        if (qu != null) {
+            qu.tearDown();
+        }
+    }
+
+    @Test
+    public void testDeleteWithChildren() throws Exception {
+        ct.testDeleteWithChildren();
+    }
+
+    @Test
+    public void testPing() throws Exception {
+        ct.testPing();
+    }
+
+    @Test
+    public void testSequentialNodeNames()
+        throws IOException, InterruptedException, KeeperException
+    {
+        ct.testSequentialNodeNames();
+    }
+
+    @Test
+    public void testACLs() throws Exception {
+        ct.testACLs();
+    }
+
+    @Test
+    public void testClientwithoutWatcherObj() throws IOException,
+            InterruptedException, KeeperException
+    {
+        ct.testClientwithoutWatcherObj();
+    }
+
+    @Test
+    public void testClientWithWatcherObj() throws IOException,
+            InterruptedException, KeeperException
+    {
+        ct.testClientWithWatcherObj();
+    }
+
+    @Test
+    public void testGetView() {
+        Assert.assertEquals(5,qb.s1.getView().size());
+        Assert.assertEquals(5,qb.s2.getView().size());
+        Assert.assertEquals(5,qb.s3.getView().size());
+        Assert.assertEquals(5,qb.s4.getView().size());
+        Assert.assertEquals(5,qb.s5.getView().size());
+    }
+
+    @Test
+    public void testViewContains() {
+        // Test view contains self
+        Assert.assertTrue(qb.s1.viewContains(qb.s1.getId()));
+
+        // Test view contains other servers
+        Assert.assertTrue(qb.s1.viewContains(qb.s2.getId()));
+
+        // Test view does not contain non-existant servers
+        Assert.assertFalse(qb.s1.viewContains(-1L));
+    }
+
+    volatile int counter = 0;
+    volatile int errors = 0;
+    @Test
+    public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException {
+        ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+            public void process(WatchedEvent event) {
+        }});
+        zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Leader leader = qb.s1.leader;
+        if (leader == null) leader = qb.s2.leader;
+        if (leader == null) leader = qb.s3.leader;
+        if (leader == null) leader = qb.s4.leader;
+        if (leader == null) leader = qb.s5.leader;
+        Assert.assertNotNull(leader);
+        for(int i = 0; i < 5000; i++) {
+            zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                }
+            }, null);
+        }
+        for(LearnerHandler f : leader.getForwardingFollowers()) {
+            f.getSocket().shutdownInput();
+        }
+        for(int i = 0; i < 5000; i++) {
+            zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                }
+            }, null);
+        }
+        // check if all the followers are alive
+        Assert.assertTrue(qb.s1.isAlive());
+        Assert.assertTrue(qb.s2.isAlive());
+        Assert.assertTrue(qb.s3.isAlive());
+        Assert.assertTrue(qb.s4.isAlive());
+        Assert.assertTrue(qb.s5.isAlive());
+        zk.close();
+    }
+
+    @Test
+    public void testMultipleWatcherObjs() throws IOException,
+            InterruptedException, KeeperException
+    {
+        ct.testMutipleWatcherObjs();
+    }
+
+    /**
+     * Make sure that we can change sessions
+     *  from follower to leader.
+     *
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    @Test
+    public void testSessionMoved() throws Exception {
+        String hostPorts[] = qb.hostPort.split(",");
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
+                ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/sessionMoveTest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        // we want to loop through the list twice
+        for(int i = 0; i < hostPorts.length*2; i++) {
+            zk.dontReconnect();
+            // This should stomp the zk handle
+            DisconnectableZooKeeper zknew =
+                new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length],
+                    ClientBase.CONNECTION_TIMEOUT,
+                    new Watcher() {public void process(WatchedEvent event) {
+                    }},
+                    zk.getSessionId(),
+                    zk.getSessionPasswd());
+            zknew.setData("/", new byte[1], -1);
+            final int result[] = new int[1];
+            result[0] = Integer.MAX_VALUE;
+            zknew.sync("/", new AsyncCallback.VoidCallback() {
+                    public void processResult(int rc, String path, Object ctx) {
+                        synchronized(result) { result[0] = rc; result.notify(); }
+                    }
+                }, null);
+            synchronized(result) {
+                if(result[0] == Integer.MAX_VALUE) {
+                    result.wait(5000);
+                }
+            }
+            LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]);
+            Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue());
+            try {
+                zk.setData("/", new byte[1], -1);
+                Assert.fail("Should have lost the connection");
+            } catch(KeeperException.ConnectionLossException e) {
+            }
+            zk = zknew;
+        }
+        zk.close();
+    }
+
+    private static class DiscoWatcher implements Watcher {
+        volatile boolean zkDisco = false;
+        public void process(WatchedEvent event) {
+            if (event.getState() == KeeperState.Disconnected) {
+                zkDisco = true;
+            }
+        }
+    }
+
+    /**
+     * Make sure the previous connection closed after session move within
+     * multiop.
+     *
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    @Test
+    public void testSessionMovedWithMultiOp() throws Exception {
+        String hostPorts[] = qb.hostPort.split(",");
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
+                ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.multi(Arrays.asList(
+            Op.create("/testSessionMovedWithMultiOp", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
+        ));
+
+        // session moved to the next server
+        ZooKeeper zknew = new ZooKeeper(hostPorts[1],
+            ClientBase.CONNECTION_TIMEOUT,
+            new Watcher() {public void process(WatchedEvent event) {
+            }},
+            zk.getSessionId(),
+            zk.getSessionPasswd());
+        zknew.multi(Arrays.asList(
+            Op.create("/testSessionMovedWithMultiOp-1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
+        ));
+
+        // try to issue the multi op again from the old connection
+        // expect to have ConnectionLossException instead of keep
+        // getting SessionMovedException
+        try {
+            zk.multi(Arrays.asList(
+                Op.create("/testSessionMovedWithMultiOp-Failed",
+                    new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
+            ));
+            Assert.fail("Should have lost the connection");
+        } catch (KeeperException.ConnectionLossException e) {
+        }
+
+        zk.close();
+        zknew.close();
+    }
+
+    /**
+     * Connect to two different servers with two different handles using the same session and
+     * make sure we cannot do any changes.
+     */
+    @Test
+    @Ignore
+    public void testSessionMove() throws Exception {
+        String hps[] = qb.hostPort.split(",");
+        DiscoWatcher oldWatcher = new DiscoWatcher();
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0],
+                ClientBase.CONNECTION_TIMEOUT, oldWatcher);
+        zk.create("/t1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        zk.dontReconnect();
+        // This should stomp the zk handle
+        DiscoWatcher watcher = new DiscoWatcher();
+        DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1],
+                ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(),
+                zk.getSessionPasswd());
+        zknew.create("/t2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        try {
+            zk.create("/t3", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            Assert.fail("Should have lost the connection");
+        } catch(KeeperException.ConnectionLossException e) {
+            // wait up to 30 seconds for the disco to be delivered
+            for (int i = 0; i < 30; i++) {
+                if (oldWatcher.zkDisco) {
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+            Assert.assertTrue(oldWatcher.zkDisco);
+        }
+
+        ArrayList<ZooKeeper> toClose = new ArrayList<ZooKeeper>();
+        toClose.add(zknew);
+        // Let's just make sure it can still move
+        for(int i = 0; i < 10; i++) {
+            zknew.dontReconnect();
+            zknew = new DisconnectableZooKeeper(hps[1],
+                    ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(),
+                    zk.getSessionId(), zk.getSessionPasswd());
+            toClose.add(zknew);
+            zknew.create("/t-"+i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+        for (ZooKeeper z: toClose) {
+            z.close();
+        }
+        zk.close();
+    }
+
+    /**
+     * See ZOOKEEPER-790 for details
+     * */
+    @Test
+    public void testFollowersStartAfterLeader() throws Exception {
+        qu = new QuorumUtil(1);
+        CountdownWatcher watcher = new CountdownWatcher();
+        qu.startQuorum();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.leader == null)
+            index++;
+
+        // break the quorum
+        qu.shutdown(index);
+
+        // try to reestablish the quorum
+        qu.start(index);
+
+        // Connect the client after services are restarted (otherwise we would get
+        // SessionExpiredException as the previous local session was not persisted).
+        ZooKeeper zk = new ZooKeeper(
+                "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
+                ClientBase.CONNECTION_TIMEOUT, watcher);
+
+        try{
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+        } catch(TimeoutException e) {
+            Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
+        }
+
+        zk.close();
+    }
+
+    // skip superhammer and clientcleanup as they are too expensive for quorum
+
+    /**
+     * Tests if a multiop submitted to a non-leader propagates to the leader properly
+     * (see ZOOKEEPER-1124).
+     *
+     * The test works as follows. It has a client connect to a follower and submit a multiop
+     * to the follower. It then verifies that the multiop successfully gets committed by the leader.
+     *
+     * Without the fix in ZOOKEEPER-1124, this fails with a ConnectionLoss KeeperException.
+     */
+    @Test
+    public void testMultiToFollower() throws Exception {
+        qu = new QuorumUtil(1);
+        CountdownWatcher watcher = new CountdownWatcher();
+        qu.startQuorum();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.leader == null)
+            index++;
+
+        ZooKeeper zk = new ZooKeeper(
+                "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
+                ClientBase.CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        zk.multi(Arrays.asList(
+                Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+                ));
+        zk.getData("/multi0", false, null);
+        zk.getData("/multi1", false, null);
+        zk.getData("/multi2", false, null);
+
+        zk.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
new file mode 100644
index 0000000..314171d
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.util.OSMXBean;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all
+ * peers, particular peer, n peers etc.
+ */
+public class QuorumUtil {
+
+    // TODO partitioning of peers and clients
+
+    // TODO refactor QuorumBase to be special case of this
+
+    private static final Logger LOG = LoggerFactory.getLogger(QuorumUtil.class);
+
+    public static class PeerStruct {
+        public int id;
+        public QuorumPeer peer;
+        public File dataDir;
+        public int clientPort;
+    }
+
+    private final Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>();
+
+    private final Map<Integer, PeerStruct> peers = new HashMap<Integer, PeerStruct>();
+
+    public final int N;
+
+    public final int ALL;
+
+    private String hostPort;
+
+    private int tickTime;
+
+    private int initLimit;
+
+    private int syncLimit;
+
+    private int electionAlg;
+
+    private boolean localSessionEnabled;
+
+    /**
+     * Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble.
+     *
+     * @param n
+     *            number of peers in the ensemble will be 2n+1
+     */
+    public QuorumUtil(int n, int syncLimit) throws RuntimeException {
+        try {
+            ClientBase.setupTestEnv();
+            JMXEnv.setUp();
+
+            N = n;
+            ALL = 2 * N + 1;
+            tickTime = 2000;
+            initLimit = 3;
+            this.syncLimit = syncLimit;
+            electionAlg = 3;
+            hostPort = "";
+
+            for (int i = 1; i <= ALL; ++i) {
+                PeerStruct ps = new PeerStruct();
+                ps.id = i;
+                ps.dataDir = ClientBase.createTmpDir();
+                ps.clientPort = PortAssignment.unique();
+                peers.put(i, ps);
+
+                peersView.put(Long.valueOf(i), new QuorumServer(i, 
+                               new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                               new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                               new InetSocketAddress("127.0.0.1", ps.clientPort), 
+                               LearnerType.PARTICIPANT));
+                hostPort += "127.0.0.1:" + ps.clientPort + ((i == ALL) ? "" : ",");
+            }
+            for (int i = 1; i <= ALL; ++i) {
+                PeerStruct ps = peers.get(i);
+                LOG.info("Creating QuorumPeer " + i + "; public port " + ps.clientPort);
+                ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort,
+                        electionAlg, ps.id, tickTime, initLimit, syncLimit);
+                Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public QuorumUtil(int n) throws RuntimeException {
+        this(n, 3);
+    }
+
+    public PeerStruct getPeer(int id) {
+        return peers.get(id);
+    }
+
+    // This was added to avoid running into the problem of ZOOKEEPER-1539
+    public boolean disableJMXTest = false;
+    
+
+    public void enableLocalSession(boolean localSessionEnabled) {
+        this.localSessionEnabled = localSessionEnabled;
+    }
+
+    public void startAll() throws IOException {
+        shutdownAll();
+        for (int i = 1; i <= ALL; ++i) {
+            start(i);
+            LOG.info("Started QuorumPeer " + i);
+        }
+
+        LOG.info("Checking ports " + hostPort);
+        for (String hp : hostPort.split(",")) {
+            Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(hp,
+                    ClientBase.CONNECTION_TIMEOUT));
+            LOG.info(hp + " is accepting client connections");
+        }
+
+        // This was added to avoid running into the problem of ZOOKEEPER-1539
+        if (disableJMXTest) return;
+        
+        // interesting to see what's there...
+        try {
+            JMXEnv.dump();
+            // make sure we have all servers listed
+            Set<String> ensureNames = new LinkedHashSet<String>();
+            for (int i = 1; i <= ALL; ++i) {
+                ensureNames.add("InMemoryDataTree");
+            }
+            for (int i = 1; i <= ALL; ++i) {
+                ensureNames
+                        .add("name0=ReplicatedServer_id" + i + ",name1=replica." + i + ",name2=");
+            }
+            for (int i = 1; i <= ALL; ++i) {
+                for (int j = 1; j <= ALL; ++j) {
+                    ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + j);
+                }
+            }
+            for (int i = 1; i <= ALL; ++i) {
+                ensureNames.add("name0=ReplicatedServer_id" + i);
+            }
+            JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
+        } catch (IOException e) {
+            LOG.warn("IOException during JMXEnv operation", e);
+        } catch (InterruptedException e) {
+            LOG.warn("InterruptedException during JMXEnv operation", e);
+        }
+    }
+
+    /**
+     * Start first N+1 peers.
+     */
+    public void startQuorum() throws IOException {
+        shutdownAll();
+        for (int i = 1; i <= N + 1; ++i) {
+            start(i);
+        }
+        for (int i = 1; i <= N + 1; ++i) {
+            Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                    + getPeer(i).clientPort, ClientBase.CONNECTION_TIMEOUT));
+        }
+    }
+
+    public void start(int id) throws IOException {
+        PeerStruct ps = getPeer(id);
+        LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
+        ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
+                ps.id, tickTime, initLimit, syncLimit);
+        if (localSessionEnabled) {
+            ps.peer.enableLocalSessions(true);
+        }
+        Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+
+        ps.peer.start();
+    }
+
+    public void restart(int id) throws IOException {
+        start(id);
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT));
+    }
+
+    public void startThenShutdown(int id) throws IOException {
+        PeerStruct ps = getPeer(id);
+        LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
+        ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
+                ps.id, tickTime, initLimit, syncLimit);
+        if (localSessionEnabled) {
+            ps.peer.enableLocalSessions(true);
+        }
+        Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+
+        ps.peer.start();
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT));
+        shutdown(id);
+    }
+
+    public void shutdownAll() {
+        for (int i = 1; i <= ALL; ++i) {
+            shutdown(i);
+        }
+        for (String hp : hostPort.split(",")) {
+            Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown(hp,
+                    ClientBase.CONNECTION_TIMEOUT));
+            LOG.info(hp + " is no longer accepting client connections");
+        }
+    }
+
+    public void shutdown(int id) {
+        QuorumPeer qp = getPeer(id).peer;
+        try {
+            LOG.info("Shutting down quorum peer " + qp.getName());
+            qp.shutdown();
+            Election e = qp.getElectionAlg();
+            if (e != null) {
+                LOG.info("Shutting down leader election " + qp.getName());
+                e.shutdown();
+            } else {
+                LOG.info("No election available to shutdown " + qp.getName());
+            }
+            LOG.info("Waiting for " + qp.getName() + " to exit thread");
+            qp.join(30000);
+            if (qp.isAlive()) {
+                Assert.fail("QP failed to shutdown in 30 seconds: " + qp.getName());
+            }
+        } catch (InterruptedException e) {
+            LOG.debug("QP interrupted: " + qp.getName(), e);
+        }
+    }
+
+    public String getConnString() {
+        return hostPort;
+    }
+
+    public String getConnectString(QuorumPeer peer) {
+        return "127.0.0.1:" + peer.getClientPort();
+    }
+
+    public QuorumPeer getLeaderQuorumPeer() {
+        for (PeerStruct ps: peers.values()) {
+            if (ps.peer.leader != null) {
+               return ps.peer;
+            }
+        }
+        throw new RuntimeException("Unable to find a leader peer");
+    }
+
+    public List<QuorumPeer> getFollowerQuorumPeers() {
+        List<QuorumPeer> peerList = new ArrayList<QuorumPeer>(ALL - 1); 
+
+        for (PeerStruct ps: peers.values()) {
+            if (ps.peer.leader == null) {
+               peerList.add(ps.peer);      
+            }
+        }
+
+        return Collections.unmodifiableList(peerList);
+    }
+
+    public void tearDown() throws Exception {
+        LOG.info("TearDown started");
+
+        OSMXBean osMbean = new OSMXBean();
+        if (osMbean.getUnix() == true) {    
+            LOG.info("fdcount after test is: " + osMbean.getOpenFileDescriptorCount());
+        }
+
+        shutdownAll();
+        JMXEnv.tearDown();
+    }
+
+    public int getLeaderServer() {
+        int index = 0;
+        for (int i = 1; i <= ALL; i++) {
+            if (getPeer(i).peer.leader != null) {
+                index = i;
+                break;
+            }
+        }
+
+        Assert.assertTrue("Leader server not found.", index > 0);
+        return index;
+    }
+
+    public String getConnectionStringForServer(final int index) {
+        return "127.0.0.1:" + getPeer(index).clientPort;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtilTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtilTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtilTest.java
new file mode 100644
index 0000000..76e6df0
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtilTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or morecontributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is intented to ensure the correct functionality of
+ * {@link QuorumUtil} helper.
+ */
+public class QuorumUtilTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(QuorumUtilTest.class);
+
+    /**
+     * <p>
+     * This test ensures that all JXM beans associated to a {@link QuorumPeer}
+     * are unregistered when shuted down ({@link QuorumUtil#shutdown(int)}). It
+     * allows a successfull restarting of several zookeeper servers (
+     * {@link QuorumPeer}) running on the same JVM.
+     * <p>
+     * See ZOOKEEPER-1214 for details.
+     */
+    @Test
+    public void validateAllMXBeanAreUnregistered() throws IOException {
+        QuorumUtil qU = new QuorumUtil(1);
+        LOG.info(">-->> Starting up all servers...");
+        qU.startAll();
+        LOG.info(">-->> Servers up and running...");
+
+        int leaderIndex = qU.getLeaderServer();
+        int firstFollowerIndex = 0;
+        int secondFollowerIndex = 0;
+
+        switch (leaderIndex) {
+        case 1:
+            firstFollowerIndex = 2;
+            secondFollowerIndex = 3;
+            break;
+        case 2:
+            firstFollowerIndex = 1;
+            secondFollowerIndex = 3;
+            break;
+        case 3:
+            firstFollowerIndex = 1;
+            secondFollowerIndex = 2;
+            break;
+
+        default:
+            Assert.fail("Unexpected leaderIndex value: " + leaderIndex);
+            break;
+        }
+
+        LOG.info(">-->> Shuting down server [{}]", firstFollowerIndex);
+        qU.shutdown(firstFollowerIndex);
+        LOG.info(">-->> Shuting down server [{}]", secondFollowerIndex);
+        qU.shutdown(secondFollowerIndex);
+        LOG.info(">-->> Restarting server [{}]", firstFollowerIndex);
+        qU.restart(firstFollowerIndex);
+        LOG.info(">-->> Restarting server [{}]", secondFollowerIndex);
+        qU.restart(secondFollowerIndex);
+
+        qU.shutdownAll();
+        Set<ZKMBeanInfo> pending = MBeanRegistry.getInstance()
+                .getRegisteredBeans();
+        Assert.assertTrue("The following beans should have been unregistered: "
+                + pending, pending.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumZxidSyncTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumZxidSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumZxidSyncTest.java
new file mode 100644
index 0000000..61556bf
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumZxidSyncTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QuorumZxidSyncTest extends ZKTestCase {
+    QuorumBase qb = new QuorumBase();
+
+    @Before
+    public void setUp() throws Exception {
+        qb.setUp();
+    }
+
+    /**
+     * find out what happens when a follower connects to leader that is behind
+     */
+    @Test
+    public void testBehindLeader() throws Exception {
+        // crank up the epoch numbers
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        cleanAndInitializeDataDir(qb.s1dir);
+        cleanAndInitializeDataDir(qb.s2dir);
+        cleanAndInitializeDataDir(qb.s3dir);
+        cleanAndInitializeDataDir(qb.s4dir);
+        qb.setupServers();
+        qb.s1.start();
+        qb.s2.start();
+        qb.s3.start();
+        qb.s4.start();
+        Assert.assertTrue("Servers didn't come up", ClientBase.waitForServerUp(qb.hostPort, 10000));
+        qb.s5.start();
+        String hostPort = "127.0.0.1:" + qb.s5.getClientPort();
+        Assert.assertFalse("Servers came up, but shouldn't have since it's ahead of leader",
+                ClientBase.waitForServerUp(hostPort, 10000));
+    }
+
+    private void cleanAndInitializeDataDir(File f) throws IOException {
+        File v = new File(f, "version-2");
+        for(File c: v.listFiles()) {
+            c.delete();
+        }
+        ClientBase.createInitializeFile(f);
+    }
+
+    /**
+     * find out what happens when the latest state is in the snapshots not
+     * the logs.
+     */
+    @Test
+    public void testLateLogs() throws Exception {
+        // crank up the epoch numbers
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        deleteLogs(qb.s1dir);
+        deleteLogs(qb.s2dir);
+        deleteLogs(qb.s3dir);
+        deleteLogs(qb.s4dir);
+        deleteLogs(qb.s5dir);
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        boolean saw2 = false;
+        for(String child: zk.getChildren("/", false)) {
+            if (child.equals("2")) {
+                saw2 = true;
+            }
+        }
+        zk.close();
+        Assert.assertTrue("Didn't see /2 (went back in time)", saw2);
+    }
+
+    private void deleteLogs(File f) {
+        File v = new File(f, "version-2");
+        for(File c: v.listFiles()) {
+            if (c.getName().startsWith("log")) {
+                c.delete();
+            }
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        qb.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
new file mode 100644
index 0000000..68c7182
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.LineNumberReader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+
+import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.WriterAppender;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NotReadOnlyException;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+public class ReadOnlyModeTest extends ZKTestCase {
+    private static final org.slf4j.Logger LOG = LoggerFactory
+            .getLogger(ReadOnlyModeTest.class);
+    private static int CONNECTION_TIMEOUT = QuorumBase.CONNECTION_TIMEOUT;
+    private QuorumUtil qu = new QuorumUtil(1);
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("readonlymode.enabled", "true");
+        qu.startQuorum();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.setProperty("readonlymode.enabled", "false");
+        qu.tearDown();
+    }
+
+    /**
+     * Test write operations using multi request.
+     */
+    @Test(timeout = 90000)
+    public void testMultiTransaction() throws Exception {
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected
+
+        final String data = "Data to be read in RO mode";
+        final String node1 = "/tnode1";
+        final String node2 = "/tnode2";
+        zk.create(node1, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        watcher.reset();
+        qu.shutdown(2);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        Assert.assertEquals("Should be in r-o mode", States.CONNECTEDREADONLY,
+                zk.getState());
+
+        // read operation during r/o mode
+        String remoteData = new String(zk.getData(node1, false, null));
+        Assert.assertEquals("Failed to read data in r-o mode", data, remoteData);
+
+        try {
+            Transaction transaction = zk.transaction();
+            transaction.setData(node1, "no way".getBytes(), -1);
+            transaction.create(node2, data.getBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            transaction.commit();
+            Assert.fail("Write operation using multi-transaction"
+                    + " api has succeeded during RO mode");
+        } catch (NotReadOnlyException e) {
+            // ok
+        }
+
+        Assert.assertNull("Should have created the znode:" + node2,
+                zk.exists(node2, false));
+    }
+    
+    /**
+     * Basic test of read-only client functionality. Tries to read and write
+     * during read-only mode, then regains a quorum and tries to write again.
+     */
+    @Test(timeout = 90000)
+    public void testReadOnlyClient() throws Exception {
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected
+
+        final String data = "Data to be read in RO mode";
+        final String node = "/tnode";
+        zk.create(node, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        watcher.reset();
+        qu.shutdown(2);
+        zk.close();
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        // read operation during r/o mode
+        String remoteData = new String(zk.getData(node, false, null));
+        Assert.assertEquals(data, remoteData);
+
+        try {
+            zk.setData(node, "no way".getBytes(), -1);
+            Assert.fail("Write operation has succeeded during RO mode");
+        } catch (NotReadOnlyException e) {
+            // ok
+        }
+
+        watcher.reset();
+        qu.start(2);
+        Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
+                "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+        zk.close();
+        watcher.reset();
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        zk.setData(node, "We're in the quorum now".getBytes(), -1);
+
+        zk.close();
+    }
+
+    /**
+     * Ensures that upon connection to a read-only server client receives
+     * ConnectedReadOnly state notification.
+     */
+    @Test(timeout = 90000)
+    public void testConnectionEvents() throws Exception {
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
+        boolean success = false;
+        for (int i = 0; i < 30; i++) {
+            try {
+                zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+                success=true;
+                break;
+            } catch(KeeperException.ConnectionLossException e) {
+                Thread.sleep(1000);               
+            }            
+        }
+        Assert.assertTrue("Did not succeed in connecting in 30s", success);
+        Assert.assertFalse("The connection should not be read-only yet", watcher.readOnlyConnected);
+
+        // kill peer and wait no more than 5 seconds for read-only server
+        // to be started (which should take one tickTime (2 seconds))
+        qu.shutdown(2);
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
+        long start = Time.currentElapsedTime();
+        while (!(zk.getState() == States.CONNECTEDREADONLY)) {
+            Thread.sleep(200);
+            // FIXME this was originally 5 seconds, but realistically, on random/slow/virt hosts, there is no way to guarantee this
+            Assert.assertTrue("Can't connect to the server",
+                              Time.currentElapsedTime() - start < 30000);
+        }
+
+        watcher.waitForReadOnlyConnected(5000);
+        zk.close();
+    }
+
+    /**
+     * Tests a situation when client firstly connects to a read-only server and
+     * then connects to a majority server. Transition should be transparent for
+     * the user.
+     */
+    @Test(timeout = 90000)
+    public void testSessionEstablishment() throws Exception {
+        qu.shutdown(2);
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        Assert.assertSame("should be in r/o mode", States.CONNECTEDREADONLY, zk
+                .getState());
+        long fakeId = zk.getSessionId();
+        LOG.info("Connected as r/o mode with state {} and session id {}",
+                zk.getState(), fakeId);
+
+        watcher.reset();
+        qu.start(2);
+        Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
+                "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+        LOG.info("Server 127.0.0.1:{} is up", qu.getPeer(2).clientPort);
+        // ZOOKEEPER-2722: wait until we can connect to a read-write server after the quorum
+        // is formed. Otherwise, it is possible that client first connects to a read-only server,
+        // then drops the connection because of shutting down of the read-only server caused
+        // by leader election / quorum forming between the read-only server and the newly started
+        // server. If we happen to execute the zk.create after the read-only server is shutdown and
+        // before the quorum is formed, we will get a ConnectLossException.
+        watcher.waitForSyncConnected(CONNECTION_TIMEOUT);
+        Assert.assertEquals("Should be in read-write mode", States.CONNECTED,
+                zk.getState());
+        LOG.info("Connected as rw mode with state {} and session id {}",
+                zk.getState(), zk.getSessionId());
+        zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        Assert.assertFalse("fake session and real session have same id", zk
+                .getSessionId() == fakeId);
+        zk.close();
+    }
+
+    /**
+     * Ensures that client seeks for r/w servers while it's connected to r/o
+     * server.
+     */
+    @SuppressWarnings("deprecation")
+    @Test(timeout = 90000)
+    public void testSeekForRwServer() throws Exception {
+        // setup the logger to capture all logs
+        Layout layout = Logger.getRootLogger().getAppender("CONSOLE")
+                .getLayout();
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        WriterAppender appender = new WriterAppender(layout, os);
+        appender.setImmediateFlush(true);
+        appender.setThreshold(Level.INFO);
+        Logger zlogger = Logger.getLogger("org.apache.zookeeper");
+        zlogger.addAppender(appender);
+
+        try {
+            qu.shutdown(2);
+            CountdownWatcher watcher = new CountdownWatcher();
+            ZooKeeper zk = new ZooKeeper(qu.getConnString(),
+                    CONNECTION_TIMEOUT, watcher, true);
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+            // if we don't suspend a peer it will rejoin a quorum
+            qu.getPeer(1).peer.suspend();
+
+            // start two servers to form a quorum; client should detect this and
+            // connect to one of them
+            watcher.reset();
+            qu.start(2);
+            qu.start(3);
+            ClientBase.waitForServerUp(qu.getConnString(), 2000);
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+            // resume poor fellow
+            qu.getPeer(1).peer.resume();
+        } finally {
+            zlogger.removeAppender(appender);
+        }
+
+        os.close();
+        LineNumberReader r = new LineNumberReader(new StringReader(os
+                .toString()));
+        String line;
+        Pattern p = Pattern.compile(".*Majority server found.*");
+        boolean found = false;
+        while ((line = r.readLine()) != null) {
+            if (p.matcher(line).matches()) {
+                found = true;
+                break;
+            }
+        }
+        Assert.assertTrue(
+                "Majority server wasn't found while connected to r/o server",
+                found);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java
new file mode 100644
index 0000000..5eda4b0
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReconfigExceptionTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(ReconfigExceptionTest.class);
+    private static String authProvider = "zookeeper.DigestAuthenticationProvider.superDigest";
+    // Use DigestAuthenticationProvider.base64Encode or
+    // run ZooKeeper jar with org.apache.zookeeper.server.auth.DigestAuthenticationProvider to generate password.
+    // An example:
+    // java -cp zookeeper-3.6.0-SNAPSHOT.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.5.jar:
+    // lib/slf4j-api-1.7.5.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider super:test
+    // The password here is 'test'.
+    private static String superDigest = "super:D/InIHSb7yEEbrWz8b9l71RjZJU=";
+    private QuorumUtil qu;
+    private ZooKeeperAdmin zkAdmin;
+
+    @Before
+    public void setup() throws InterruptedException {
+        System.setProperty(authProvider, superDigest);
+        QuorumPeerConfig.setReconfigEnabled(true);
+
+        // Get a three server quorum.
+        qu = new QuorumUtil(1);
+        qu.disableJMXTest = true;
+
+        try {
+            qu.startAll();
+        } catch (IOException e) {
+            Assert.fail("Fail to start quorum servers.");
+        }
+
+        resetZKAdmin();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.clearProperty(authProvider);
+        try {
+            if (qu != null) {
+                qu.tearDown();
+            }
+            if (zkAdmin != null) {
+                zkAdmin.close();
+            }
+        } catch (Exception e) {
+            // Ignore.
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testReconfigDisabled() throws InterruptedException {
+        QuorumPeerConfig.setReconfigEnabled(false);
+        try {
+            reconfigPort();
+            Assert.fail("Reconfig should be disabled.");
+        } catch (KeeperException e) {
+            Assert.assertTrue(e.code() == KeeperException.Code.RECONFIGDISABLED);
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testReconfigFailWithoutAuth() throws InterruptedException {
+        try {
+            reconfigPort();
+            Assert.fail("Reconfig should fail without auth.");
+        } catch (KeeperException e) {
+            // However a failure is still expected as user is not authenticated, so ACL check will fail.
+            Assert.assertTrue(e.code() == KeeperException.Code.NOAUTH);
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testReconfigEnabledWithSuperUser() throws InterruptedException {
+        try {
+            zkAdmin.addAuthInfo("digest", "super:test".getBytes());
+            Assert.assertTrue(reconfigPort());
+        } catch (KeeperException e) {
+            Assert.fail("Reconfig should not fail, but failed with exception : " + e.getMessage());
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testReconfigFailWithAuthWithNoACL() throws InterruptedException {
+        resetZKAdmin();
+
+        try {
+            zkAdmin.addAuthInfo("digest", "user:test".getBytes());
+            reconfigPort();
+            Assert.fail("Reconfig should fail without a valid ACL associated with user.");
+        } catch (KeeperException e) {
+            // Again failure is expected because no ACL is associated with this user.
+            Assert.assertTrue(e.code() == KeeperException.Code.NOAUTH);
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testReconfigEnabledWithAuthAndWrongACL() throws InterruptedException {
+        resetZKAdmin();
+
+        try {
+            zkAdmin.addAuthInfo("digest", "super:test".getBytes());
+            // There is ACL however the permission is wrong - need WRITE permission at leaste.
+            ArrayList<ACL> acls = new ArrayList<ACL>(
+                    Collections.singletonList(
+                            new ACL(ZooDefs.Perms.READ,
+                                    new Id("digest", "user:tl+z3z0vO6PfPfEENfLF96E6pM0="/* password is test */))));
+            zkAdmin.setACL(ZooDefs.CONFIG_NODE, acls, -1);
+            resetZKAdmin();
+            zkAdmin.addAuthInfo("digest", "user:test".getBytes());
+            reconfigPort();
+            Assert.fail("Reconfig should fail with an ACL that is read only!");
+        } catch (KeeperException e) {
+            Assert.assertTrue(e.code() == KeeperException.Code.NOAUTH);
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testReconfigEnabledWithAuthAndACL() throws InterruptedException {
+        resetZKAdmin();
+
+        try {
+            zkAdmin.addAuthInfo("digest", "super:test".getBytes());
+            ArrayList<ACL> acls = new ArrayList<ACL>(
+                    Collections.singletonList(
+                            new ACL(ZooDefs.Perms.WRITE,
+                            new Id("digest", "user:tl+z3z0vO6PfPfEENfLF96E6pM0="/* password is test */))));
+            zkAdmin.setACL(ZooDefs.CONFIG_NODE, acls, -1);
+            resetZKAdmin();
+            zkAdmin.addAuthInfo("digest", "user:test".getBytes());
+            Assert.assertTrue(reconfigPort());
+        } catch (KeeperException e) {
+            Assert.fail("Reconfig should not fail, but failed with exception : " + e.getMessage());
+        }
+    }
+
+    // Utility method that recreates a new ZooKeeperAdmin handle, and wait for the handle to connect to
+    // quorum servers.
+    private void resetZKAdmin() throws InterruptedException {
+        String cnxString;
+        ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher();
+        try {
+            cnxString = "127.0.0.1:" + qu.getPeer(1).peer.getClientPort();
+            if (zkAdmin != null) {
+                zkAdmin.close();
+            }
+            zkAdmin = new ZooKeeperAdmin(cnxString,
+                    ClientBase.CONNECTION_TIMEOUT, watcher);
+        } catch (IOException e) {
+            Assert.fail("Fail to create ZooKeeperAdmin handle.");
+            return;
+        }
+
+        try {
+            watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        } catch (InterruptedException | TimeoutException e) {
+            Assert.fail("ZooKeeper admin client can not connect to " + cnxString);
+        }
+    }
+
+    private boolean reconfigPort() throws KeeperException, InterruptedException {
+        List<String> joiningServers = new ArrayList<String>();
+        int leaderId = 1;
+        while (qu.getPeer(leaderId).peer.leader == null)
+            leaderId++;
+        int followerId = leaderId == 1 ? 2 : 1;
+        joiningServers.add("server." + followerId + "=localhost:"
+                + qu.getPeer(followerId).peer.getQuorumAddress().getPort() /*quorum port*/
+                + ":" + qu.getPeer(followerId).peer.getElectionAddress().getPort() /*election port*/
+                + ":participant;localhost:" + PortAssignment.unique()/* new client port */);
+        zkAdmin.reconfigure(joiningServers, null, null, -1, new Stat());
+        return true;
+    }
+}


Mime
View raw message