zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [08/51] [partial] zookeeper git commit: ZOOKEEPER-3032: MAVEN MIGRATION - move java server, client
Date Fri, 05 Oct 2018 12:26:01 GMT
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java b/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
deleted file mode 100644
index 9546c25..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server.quorum;
-
-import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.apache.zookeeper.test.ClientBase;
-import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.security.sasl.SaslException;
-
-public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
-    private static int SERVER_COUNT = 3;
-    private MainThread[] mt = new MainThread[SERVER_COUNT];
-
-    /**
-     * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2355.
-     * ZooKeeper ephemeral node is never deleted if follower fail while reading
-     * the proposal packet.
-     */
-
-    @Test(timeout = 120000)
-    public void testEphemeralNodeDeletion() throws Exception {
-        final int clientPorts[] = new int[SERVER_COUNT];
-        StringBuilder sb = new StringBuilder();
-        String server;
-
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            clientPorts[i] = PortAssignment.unique();
-            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
-                    + clientPorts[i];
-            sb.append(server + "\n");
-        }
-        String currentQuorumCfgSection = sb.toString();
-        // start all the servers
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
-                    false) {
-                @Override
-                public TestQPMain getTestQPMain() {
-                    return new MockTestQPMain();
-                }
-            };
-            mt[i].start();
-        }
-
-        // ensure all servers started
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            Assert.assertTrue("waiting for server " + i + " being up",
-                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
-                            CONNECTION_TIMEOUT));
-        }
-
-        CountdownWatcher watch = new CountdownWatcher();
-        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[1],
-                ClientBase.CONNECTION_TIMEOUT, watch);
-        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
-
-        /**
-         * now the problem scenario starts
-         */
-
-        Stat firstEphemeralNode = new Stat();
-
-        // 1: create ephemeral node
-        String nodePath = "/e1";
-        zk.create(nodePath, "1".getBytes(), Ids.OPEN_ACL_UNSAFE,
-                CreateMode.EPHEMERAL, firstEphemeralNode);
-        assertEquals("Current session and ephemeral owner should be same",
-                zk.getSessionId(), firstEphemeralNode.getEphemeralOwner());
-
-        // 2: inject network problem in one of the follower
-        CustomQuorumPeer follower = (CustomQuorumPeer) getByServerState(mt,
-                ServerState.FOLLOWING);
-        follower.setInjectError(true);
-
-        // 3: close the session so that ephemeral node is deleted
-        zk.close();
-
-        // remove the error
-        follower.setInjectError(false);
-
-        Assert.assertTrue("Faulted Follower should have joined quorum by now",
-                ClientBase.waitForServerUp(
-                        "127.0.0.1:" + follower.getClientPort(),
-                        CONNECTION_TIMEOUT));
-
-        QuorumPeer leader = getByServerState(mt, ServerState.LEADING);
-        assertNotNull("Leader should not be null", leader);
-        Assert.assertTrue("Leader must be running", ClientBase.waitForServerUp(
-                "127.0.0.1:" + leader.getClientPort(), CONNECTION_TIMEOUT));
-
-        watch = new CountdownWatcher();
-        zk = new ZooKeeper("127.0.0.1:" + leader.getClientPort(),
-                ClientBase.CONNECTION_TIMEOUT, watch);
-        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
-
-        Stat exists = zk.exists(nodePath, false);
-        assertNull("Node must have been deleted from leader", exists);
-
-        CountdownWatcher followerWatch = new CountdownWatcher();
-        ZooKeeper followerZK = new ZooKeeper(
-                "127.0.0.1:" + follower.getClientPort(),
-                ClientBase.CONNECTION_TIMEOUT, followerWatch);
-        followerWatch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
-        Stat nodeAtFollower = followerZK.exists(nodePath, false);
-
-        // Problem 1: Follower had one extra ephemeral node /e1
-        assertNull("ephemeral node must not exist", nodeAtFollower);
-
-        // Create the node with another session
-        Stat currentEphemeralNode = new Stat();
-        zk.create(nodePath, "2".getBytes(), Ids.OPEN_ACL_UNSAFE,
-                CreateMode.EPHEMERAL, currentEphemeralNode);
-
-        // close the session and newly created ephemeral node should be deleted
-        zk.close();
-        
-        SyncCallback cb = new SyncCallback();
-        followerZK.sync(nodePath, cb, null);
-        cb.sync.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-        
-        nodeAtFollower = followerZK.exists(nodePath, false);
-
-        // Problem 2: Before fix, after session close the ephemeral node
-        // was not getting deleted. But now after the fix after session close
-        // ephemeral node is getting deleted.
-        assertNull("After session close ephemeral node must be deleted",
-                nodeAtFollower);
-        followerZK.close();
-    }
-
-    @After
-    public void tearDown() {
-        // stop all severs
-        for (int i = 0; i < mt.length; i++) {
-            try {
-                mt[i].shutdown();
-            } catch (InterruptedException e) {
-                LOG.warn("Quorum Peer interrupted while shutting it down", e);
-            }
-        }
-    }
-
-    private QuorumPeer getByServerState(MainThread[] mt, ServerState state) {
-        for (int i = mt.length - 1; i >= 0; i--) {
-            QuorumPeer quorumPeer = mt[i].getQuorumPeer();
-            if (null != quorumPeer && state == quorumPeer.getPeerState()) {
-                return quorumPeer;
-            }
-        }
-        return null;
-    }
-
-    static class CustomQuorumPeer extends QuorumPeer {
-        private boolean injectError = false;
-
-        public CustomQuorumPeer() throws SaslException {
-
-        }
-
-        @Override
-        protected Follower makeFollower(FileTxnSnapLog logFactory)
-                throws IOException {
-            return new Follower(this, new FollowerZooKeeperServer(logFactory,
-                    this, this.getZkDb())) {
-
-                @Override
-                void readPacket(QuorumPacket pp) throws IOException {
-                    /**
-                     * In real scenario got SocketTimeoutException while reading
-                     * the packet from leader because of network problem, but
-                     * here throwing SocketTimeoutException based on whether
-                     * error is injected or not
-                     */
-                    super.readPacket(pp);
-                    if (injectError && pp.getType() == Leader.PROPOSAL) {
-                        String type = LearnerHandler.packetToString(pp);
-                        throw new SocketTimeoutException(
-                                "Socket timeout while reading the packet for operation "
-                                        + type);
-                    }
-                }
-
-            };
-        }
-
-        public void setInjectError(boolean injectError) {
-            this.injectError = injectError;
-        }
-
-    }
-
-    static class MockTestQPMain extends TestQPMain {
-        @Override
-        protected QuorumPeer getQuorumPeer() throws SaslException {
-            return new CustomQuorumPeer();
-        }
-    }
-    
-    private static class SyncCallback implements AsyncCallback.VoidCallback {
-        private final CountDownLatch sync = new CountDownLatch(1);
-        
-        @Override
-        public void processResult(int rc, String path, Object ctx) {
-        	sync.countDown();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
deleted file mode 100644
index f50d0bb..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.quorum.QuorumCnxManager;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.Vote;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FLEBackwardElectionRoundTest extends ZKTestCase {
-    protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
-
-    int count;
-    Map<Long,QuorumServer> peers;
-    File tmpdir[];
-    int port[];
-
-    QuorumCnxManager cnxManagers[];
-
-    @Before
-    public void setUp() throws Exception {
-        count = 3;
-
-        peers = new HashMap<Long,QuorumServer>(count);
-        tmpdir = new File[count];
-        port = new int[count];
-        cnxManagers = new QuorumCnxManager[count - 1];
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        for(int i = 0; i < (count - 1); i++){
-            if(cnxManagers[i] != null){
-                cnxManagers[i].halt();
-            }
-        }
-    }
-
-    /**
-     * This test is checking the following case. A server S is
-     * currently LOOKING and it receives notifications from
-     * a quorum indicating they are following S. The election
-     * round E of S is higher than the election round E' in the
-     * notification messages, so S becomes the leader and sets
-     * its epoch back to E'. In the meanwhile, one or more
-     * followers turn to LOOKING and elect S in election round E.
-     * Having leader and followers with different election rounds
-     * might prevent other servers from electing a leader because
-     * they can't get a consistent set of notifications from a
-     * quorum.
-     *
-     * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1514}
-     *
-     *
-     * @throws Exception
-     */
-
-    @Test
-    public void testBackwardElectionRound() throws Exception {
-        LOG.info("TestLE: {}, {}", getTestName(), count);
-        for(int i = 0; i < count; i++) {
-            int clientport = PortAssignment.unique();
-            peers.put(Long.valueOf(i),
-                    new QuorumServer(i,
-                            new InetSocketAddress(clientport),
-                            new InetSocketAddress(PortAssignment.unique())));
-            tmpdir[i] = ClientBase.createTmpDir();
-            port[i] = clientport;
-        }
-
-        ByteBuffer initialMsg0 = getMsg();
-        ByteBuffer initialMsg1 = getMsg();
-
-        /*
-         * Start server 0
-         */
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
-        peer.startLeaderElection();
-        FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
-        thread.start();
-
-        /*
-         * Start mock server 1
-         */
-        QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
-        cnxManagers[0] = mockPeer.createCnxnManager();
-        cnxManagers[0].listener.start();
-
-        cnxManagers[0].toSend(0l, initialMsg0);
-
-        /*
-         * Start mock server 2
-         */
-        mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
-        cnxManagers[1] = mockPeer.createCnxnManager();
-        cnxManagers[1].listener.start();
-
-        cnxManagers[1].toSend(0l, initialMsg1);
-
-        /*
-         * Run another instance of leader election.
-         */
-        thread.join(5000);
-        thread = new FLETestUtils.LEThread(peer, 0);
-        thread.start();
-
-        /*
-         * Send the same messages, this time should not make 0 the leader.
-         */
-        cnxManagers[0].toSend(0l, initialMsg0);
-        cnxManagers[1].toSend(0l, initialMsg1);
-
-        thread.join(5000);
-
-        if (!thread.isAlive()) {
-            Assert.fail("Should not have joined");
-        }
-
-    }
-
-    private ByteBuffer getMsg() {
-        return FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
deleted file mode 100644
index 6583f90..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.quorum.FastLeaderElection;
-import org.apache.zookeeper.server.quorum.QuorumCnxManager;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FLELostMessageTest extends ZKTestCase {
-    protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
-
-    int count;
-    HashMap<Long,QuorumServer> peers;
-    File tmpdir[];
-    int port[];
-
-    QuorumCnxManager cnxManager;
-
-    @Before
-    public void setUp() throws Exception {
-        count = 3;
-
-        peers = new HashMap<Long,QuorumServer>(count);
-        tmpdir = new File[count];
-        port = new int[count];
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        cnxManager.halt();
-    }
-
-    @Test
-    public void testLostMessage() throws Exception {
-        LOG.info("TestLE: {}, {}", getTestName(), count);
-        for(int i = 0; i < count; i++) {
-            int clientport = PortAssignment.unique();
-            peers.put(Long.valueOf(i),
-                    new QuorumServer(i,
-                            new InetSocketAddress(clientport),
-                            new InetSocketAddress(PortAssignment.unique())));
-            tmpdir[i] = ClientBase.createTmpDir();
-            port[i] = clientport;
-        }
-
-        /*
-         * Start server 0
-         */
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
-        peer.startLeaderElection();
-        FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
-        thread.start();
-
-        /*
-         * Start mock server 1
-         */
-        mockServer();
-        thread.join(5000);
-        if (thread.isAlive()) {
-            Assert.fail("Threads didn't join");
-        }
-    }
-
-    void mockServer() throws InterruptedException, IOException {
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
-        cnxManager = peer.createCnxnManager();
-        cnxManager.listener.start();
-
-        cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0));
-        cnxManager.recvQueue.take();
-        cnxManager.toSend(1L, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 0));
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java b/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
deleted file mode 100644
index a907abd..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.nio.ByteBuffer;
-
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.quorum.FastLeaderElection;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.Vote;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Assert;
-
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-
-public class FLETestUtils extends ZKTestCase {
-    protected static final Logger LOG = LoggerFactory.getLogger(FLETestUtils.class);
-
-    /*
-     * Thread to run an instance of leader election for 
-     * a given quorum peer.
-     */
-    static class LEThread extends Thread {
-        private int i;
-        private QuorumPeer peer;
-
-        LEThread(QuorumPeer peer, int i) {
-            this.i = i;
-            this.peer = peer;
-            LOG.info("Constructor: {}", getName());
-
-        }
-
-        public void run() {
-            try {
-                Vote v = null;
-                peer.setPeerState(ServerState.LOOKING);
-                LOG.info("Going to call leader election: {}", i);
-                v = peer.getElectionAlg().lookForLeader();
-
-                if (v == null) {
-                    Assert.fail("Thread " + i + " got a null vote");
-                }
-
-                /*
-                 * A real zookeeper would take care of setting the current vote. Here
-                 * we do it manually.
-                 */
-                peer.setCurrentVote(v);
-
-                LOG.info("Finished election: {}, {}", i, v.getId());
-
-                Assert.assertTrue("State is not leading.", peer.getPeerState() == ServerState.LEADING);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-            LOG.info("Joining");
-        }
-    }
-
-    /*
-     * Creates a leader election notification message.
-     */
-    static ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
-        return FastLeaderElection.buildMsg(state, leader, zxid, 1, epoch);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
deleted file mode 100644
index c4ca8a8..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.security.sasl.SaslException;
-
-import org.apache.jute.OutputArchive;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.Op;
-
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.DataNode;
-import org.apache.zookeeper.server.DataTree;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.test.ClientBase;
-
-import org.junit.Assert;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test cases used to catch corner cases due to fuzzy snapshot.
- */
-public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FuzzySnapshotRelatedTest.class);
-
-    MainThread[] mt = null;
-    ZooKeeper[] zk = null;
-    int[] clientPorts = null;
-    int leaderId;
-    int followerA;
-
-    @Before
-    public void setup() throws Exception {
-        LOG.info("Start up a 3 server quorum");
-        final int ENSEMBLE_SERVERS = 3;
-        clientPorts = new int[ENSEMBLE_SERVERS];
-        StringBuilder sb = new StringBuilder();
-        String server;
-
-        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
-            clientPorts[i] = PortAssignment.unique();
-            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
-                    + clientPorts[i];
-            sb.append(server + "\n");
-        }
-        String currentQuorumCfgSection = sb.toString();
-
-        // start servers
-        mt = new MainThread[ENSEMBLE_SERVERS];
-        zk = new ZooKeeper[ENSEMBLE_SERVERS];
-        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
-            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
-                    false) {
-                @Override
-                public TestQPMain getTestQPMain() {
-                    return new CustomizedQPMain();
-                }
-            };
-            mt[i].start();
-            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
-                    ClientBase.CONNECTION_TIMEOUT, this);
-        }
-        QuorumPeerMainTest.waitForAll(zk, States.CONNECTED);
-        LOG.info("all servers started");
-
-        leaderId = -1;
-        followerA = -1;
-        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
-            if (mt[i].main.quorumPeer.leader != null) {
-                leaderId = i;
-            } else if (followerA == -1) {
-                followerA = i;
-            }
-        }
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (mt != null) {
-            for (MainThread t: mt) {
-                t.shutdown();
-            }
-        }
-
-        if (zk != null) {
-            for (ZooKeeper z: zk) {
-                z.close();
-            }
-        }
-    }
-
-    @Test
-    public void testMultiOpConsistency() throws Exception {
-        LOG.info("Create a parent node");
-        final String path = "/testMultiOpConsistency";
-        createEmptyNode(zk[followerA], path);
-
-        LOG.info("Hook to catch the 2nd sub create node txn in multi-op");
-        CustomDataTree dt =
-                (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree();
-
-        final ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
-
-        String node1 = path + "/1";
-        String node2 = path + "/2";
-
-        dt.addNodeCreateListener(node2, new NodeCreateListener() {
-            @Override
-            public void process(String path) {
-                LOG.info("Take a snapshot");
-                zkServer.takeSnapshot(true);
-            }
-        });
-
-        LOG.info("Issue a multi op to create 2 nodes");
-        zk[followerA].multi(Arrays.asList(
-            Op.create(node1, node1.getBytes(),
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
-            Op.create(node2, node2.getBytes(),
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT))
-        );
-
-        LOG.info("Restart the server");
-        mt[followerA].shutdown();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
-
-        mt[followerA].start();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
-
-        LOG.info("Make sure the node consistent with leader");
-        Assert.assertEquals(new String(zk[leaderId].getData(node2, null, null)),
-                new String(zk[followerA].getData(node2, null, null)));
-    }
-
-    /**
-     * It's possibel during SNAP sync, the parent is serialized before the
-     * child get deleted during sending the snapshot over.
-     *
-     * In which case, we need to make sure the pzxid get correctly updated
-     * when applying the txns received.
-     */
-    @Test
-    public void testPZxidUpdatedDuringSnapSyncing() throws Exception {
-        LOG.info("Enable force snapshot sync");
-        System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
-
-        final String parent = "/testPZxidUpdatedWhenDeletingNonExistNode";
-        final String child = parent + "/child";
-        createEmptyNode(zk[leaderId], parent);
-        createEmptyNode(zk[leaderId], child);
-
-        LOG.info("shutdown follower {}", followerA);
-        mt[followerA].shutdown();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
-
-        LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
-        addSerializeListener(leaderId, parent, child);
-
-        LOG.info("Restart follower A to trigger a SNAP sync with leader");
-        mt[followerA].start();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
-
-        LOG.info("Check and make sure the pzxid of the parent is the same " +
-                "on leader and follower A");
-        compareStat(parent, leaderId, followerA);
-    }
-
-    /**
-     * It's possible during taking fuzzy snapshot, the parent is serialized
-     * before the child get deleted in the fuzzy range.
-     *
-     * In which case, we need to make sure the pzxid get correctly updated
-     * when replaying the txns.
-     */
-    @Test
-    public void testPZxidUpdatedWhenLoadingSnapshot() throws Exception {
-
-        final String parent = "/testPZxidUpdatedDuringTakingSnapshot";
-        final String child = parent + "/child";
-        createEmptyNode(zk[followerA], parent);
-        createEmptyNode(zk[followerA], child);
-
-        LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
-        addSerializeListener(followerA, parent, child);
-
-        LOG.info("Take snapshot on follower A");
-        ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
-        zkServer.takeSnapshot(true);
-
-        LOG.info("Restarting follower A to load snapshot");
-        mt[followerA].shutdown();
-        mt[followerA].start();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
-
-        LOG.info("Check and make sure the pzxid of the parent is the same " +
-                "on leader and follower A");
-        compareStat(parent, leaderId, followerA);
-    }
-
-    private void addSerializeListener(int sid, String parent, String child) {
-        final ZooKeeper zkClient = zk[followerA];
-        CustomDataTree dt =
-                (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree();
-        dt.addListener(parent, new NodeSerializeListener() {
-            @Override
-            public void nodeSerialized(String path) {
-                try {
-                    zkClient.delete(child, -1);
-                    LOG.info("Deleted the child node after the parent is serialized");
-                } catch (Exception e) {
-                    LOG.error("Error when deleting node {}", e);
-                }
-            }
-        });
-    }
-
-    private void compareStat(String path, int sid, int compareWithSid) throws Exception{
-        Stat stat1 = new Stat();
-        zk[sid].getData(path, null, stat1);
-
-        Stat stat2 = new Stat();
-        zk[compareWithSid].getData(path, null, stat2);
-
-        Assert.assertEquals(stat1, stat2);
-    }
-
-    @Test
-    public void testGlobalSessionConsistency() throws Exception {
-        LOG.info("Hook to catch the commitSession event on followerA");
-        CustomizedQPMain followerAMain = (CustomizedQPMain) mt[followerA].main;
-        final ZooKeeperServer zkServer = followerAMain.quorumPeer.getActiveServer();
-
-        // only take snapshot for the next global session we're going to create
-        final AtomicBoolean shouldTakeSnapshot = new AtomicBoolean(true);
-        followerAMain.setCommitSessionListener(new CommitSessionListener() {
-            @Override
-            public void process(long sessionId) {
-                LOG.info("Take snapshot");
-                if (shouldTakeSnapshot.getAndSet(false)) {
-                    zkServer.takeSnapshot(true);
-                }
-            }
-        });
-
-        LOG.info("Create a global session");
-        ZooKeeper globalClient = new ZooKeeper(
-                "127.0.0.1:" + clientPorts[followerA],
-                ClientBase.CONNECTION_TIMEOUT, this);
-        QuorumPeerMainTest.waitForOne(globalClient, States.CONNECTED);
-
-        LOG.info("Restart followerA to load the data from disk");
-        mt[followerA].shutdown();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
-
-        mt[followerA].start();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
-
-        LOG.info("Make sure the global sessions are consistent with leader");
-
-        Map<Long, Integer> globalSessionsOnLeader =
-                mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
-        if (mt[followerA].main.quorumPeer == null) {
-            LOG.info("quorumPeer is null");
-        }
-        if (mt[followerA].main.quorumPeer.getZkDb() == null) {
-            LOG.info("zkDb is null");
-        }
-        Map<Long, Integer> globalSessionsOnFollowerA =
-                mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
-        LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(),
-                globalSessionsOnFollowerA.keySet());
-        Assert.assertTrue(globalSessionsOnFollowerA.keySet().containsAll(
-                  globalSessionsOnLeader.keySet()));
-    }
-
-    private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
-        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    }
-
-    static interface NodeCreateListener {
-        public void process(String path);
-
-    }
-
-    static class CustomDataTree extends DataTree {
-        Map<String, NodeCreateListener> nodeCreateListeners =
-                new HashMap<String, NodeCreateListener>();
-        Map<String, NodeSerializeListener> listeners =
-                new HashMap<String, NodeSerializeListener>();
-
-        @Override
-        public void serializeNodeData(OutputArchive oa, String path,
-                DataNode node) throws IOException {
-            super.serializeNodeData(oa, path, node);
-            NodeSerializeListener listener = listeners.get(path);
-            if (listener != null) {
-                listener.nodeSerialized(path);
-            }
-        }
-
-        public void addListener(String path, NodeSerializeListener listener) {
-            listeners.put(path, listener);
-        }
-
-        @Override
-        public void createNode(final String path, byte data[], List<ACL> acl,
-                           long ephemeralOwner, int parentCVersion, long zxid,
-                           long time, Stat outputStat)
-                throws NoNodeException, NodeExistsException {
-            NodeCreateListener listener = nodeCreateListeners.get(path);
-            if (listener != null) {
-                listener.process(path);
-            }
-            super.createNode(path, data, acl, ephemeralOwner, parentCVersion,
-                    zxid, time, outputStat);
-        }
-
-        public void addNodeCreateListener(String path, NodeCreateListener listener) {
-            nodeCreateListeners.put(path, listener);
-        }
-    }
-
-    static interface NodeSerializeListener {
-        public void nodeSerialized(String path);
-    }
-
-    static interface CommitSessionListener {
-        public void process(long sessionId);
-    }
-
-    static class CustomizedQPMain extends TestQPMain {
-        CommitSessionListener commitSessionListener;
-
-        public void setCommitSessionListener(CommitSessionListener listener) {
-            this.commitSessionListener = listener;
-        }
-
-        @Override
-        protected QuorumPeer getQuorumPeer() throws SaslException {
-            return new QuorumPeer() {
-                @Override
-                public void setZKDatabase(ZKDatabase database) {
-                    super.setZKDatabase(new ZKDatabase(this.getTxnFactory()) {
-                        @Override
-                        public DataTree createDataTree() {
-                            return new CustomDataTree();
-                        }
-                    });
-                }
-
-                @Override
-                protected Follower makeFollower(FileTxnSnapLog logFactory)
-                        throws IOException {
-                    return new Follower(this, new FollowerZooKeeperServer(
-                            logFactory, this, this.getZkDb()) {
-                        @Override
-                        public void createSessionTracker() {
-                            sessionTracker = new LearnerSessionTracker(
-                                    this, getZKDatabase().getSessionWithTimeOuts(),
-                                    this.tickTime, self.getId(),
-                                    self.areLocalSessionsEnabled(),
-                                    getZooKeeperServerListener()) {
-
-                                public synchronized boolean commitSession(
-                                        long sessionId, int sessionTimeout) {
-                                    if (commitSessionListener != null) {
-                                        commitSessionListener.process(sessionId);
-                                    }
-                                    return super.commitSession(sessionId, sessionTimeout);
-                                }
-                            };
-                        }
-                    });
-                }
-            };
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LeaderBeanTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
deleted file mode 100644
index b017551..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.jute.OutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
-import org.apache.zookeeper.server.util.SerializeUtils;
-import org.apache.zookeeper.test.ClientBase;
-import org.apache.zookeeper.txn.TxnHeader;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class LeaderBeanTest {
-    private Leader leader;
-    private LeaderBean leaderBean;
-    private FileTxnSnapLog fileTxnSnapLog;
-    private LeaderZooKeeperServer zks;
-    private QuorumPeer qp;
-
-    @Before
-    public void setUp() throws IOException {
-        qp = new QuorumPeer();
-        QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
-        qp.setQuorumVerifier(quorumVerifierMock, false);
-        File tmpDir = ClientBase.createEmptyTestDir();
-        fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"),
-                new File(tmpDir, "data_txnlog"));
-        ZKDatabase zkDb = new ZKDatabase(fileTxnSnapLog);
-
-        zks = new LeaderZooKeeperServer(fileTxnSnapLog, qp, zkDb);
-        leader = new Leader(qp, zks);
-        leaderBean = new LeaderBean(leader, zks);
-    }
-
-    @After
-    public void tearDown() throws IOException {
-        fileTxnSnapLog.close();
-    }
-
-    @Test
-    public void testGetName() {
-        assertEquals("Leader", leaderBean.getName());
-    }
-
-    @Test
-    public void testGetCurrentZxid() {
-        // Arrange
-        zks.setZxid(1);
-
-        // Assert
-        assertEquals("0x1", leaderBean.getCurrentZxid());
-    }
-
-    @Test
-    public void testGetElectionTimeTaken() {
-        // Arrange
-        qp.setElectionTimeTaken(1);
-
-        // Assert
-        assertEquals(1, leaderBean.getElectionTimeTaken());
-    }
-
-    @Test
-    public void testGetProposalSize() throws IOException, Leader.XidRolloverException {
-        // Arrange
-        Request req = createMockRequest();
-
-        // Act
-        leader.propose(req);
-
-        // Assert
-        byte[] data = SerializeUtils.serializeRequest(req);
-        assertEquals(data.length, leaderBean.getLastProposalSize());
-        assertEquals(data.length, leaderBean.getMinProposalSize());
-        assertEquals(data.length, leaderBean.getMaxProposalSize());
-    }
-
-    @Test
-    public void testResetProposalStats() throws IOException, Leader.XidRolloverException {
-        // Arrange
-        int initialProposalSize = leaderBean.getLastProposalSize();
-        Request req = createMockRequest();
-
-        // Act
-        leader.propose(req);
-
-        // Assert
-        assertNotEquals(initialProposalSize, leaderBean.getLastProposalSize());
-        leaderBean.resetProposalStatistics();
-        assertEquals(initialProposalSize, leaderBean.getLastProposalSize());
-        assertEquals(initialProposalSize, leaderBean.getMinProposalSize());
-        assertEquals(initialProposalSize, leaderBean.getMaxProposalSize());
-    }
-
-    private Request createMockRequest() throws IOException {
-        TxnHeader header = mock(TxnHeader.class);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                Object[] args = invocation.getArguments();
-                OutputArchive oa = (OutputArchive) args[0];
-                oa.writeString("header", "test");
-                return null;
-            }
-        }).when(header).serialize(any(OutputArchive.class), anyString());
-        Record txn = mock(Record.class);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                Object[] args = invocation.getArguments();
-                OutputArchive oa = (OutputArchive) args[0];
-                oa.writeString("record", "test");
-                return null;
-            }
-        }).when(txn).serialize(any(OutputArchive.class), anyString());
-        return new Request(1, 2, 3, header, txn, 4);
-    }
-
-    @Test
-    public void testFollowerInfo() throws IOException {
-        LearnerHandler follower = mock(LearnerHandler.class);
-        when(follower.getLearnerType()).thenReturn(LearnerType.PARTICIPANT);
-        when(follower.toString()).thenReturn("1");
-        leader.addLearnerHandler(follower);
-
-        assertEquals("1\n", leaderBean.followerInfo());
-
-        LearnerHandler observer = mock(LearnerHandler.class);
-        when(observer.getLearnerType()).thenReturn(LearnerType.OBSERVER);
-        when(observer.toString()).thenReturn("2");
-        leader.addLearnerHandler(observer);
-
-        assertEquals("1\n", leaderBean.followerInfo());
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
deleted file mode 100644
index 2548aca..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
-import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
-
-public class LeaderWithObserverTest {
-
-    QuorumPeer peer;
-    Leader leader;
-    File tmpDir;
-    long participantId;
-    long observerId;
-
-    @Before
-    public void setUp() throws Exception {
-        tmpDir = ClientBase.createTmpDir();
-        peer = createQuorumPeer(tmpDir);
-        participantId = 1;
-        Map<Long, QuorumPeer.QuorumServer> peers = peer.getQuorumVerifier().getAllMembers();
-        observerId = peers.size();
-        leader = createLeader(tmpDir, peer);
-        peer.leader = leader;
-        peers.put(observerId, new QuorumPeer.QuorumServer(
-                observerId, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-                QuorumPeer.LearnerType.OBSERVER));
-
-        // these tests are serial, we can speed up InterruptedException
-        peer.tickTime = 1;
-    }
-
-    @After
-    public void tearDown(){
-        leader.shutdown("end of test");
-        tmpDir.delete();
-    }
-
-    @Test
-    public void testGetEpochToPropose() throws Exception {
-        long lastAcceptedEpoch = 5;
-        peer.setAcceptedEpoch(5);
-
-        Assert.assertEquals("Unexpected vote in connectingFollowers", 0, leader.connectingFollowers.size());
-        Assert.assertTrue(leader.waitingForNewEpoch);
-        try {
-            // Leader asks for epoch (mocking Leader.lead behavior)
-            // First add to connectingFollowers
-            leader.getEpochToPropose(peer.getId(), lastAcceptedEpoch);
-        } catch (InterruptedException e) {
-            // ignore timeout
-        }
-
-        Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size());
-        Assert.assertEquals("Leader shouldn't set new epoch until quorum of participants is in connectingFollowers",
-                lastAcceptedEpoch, peer.getAcceptedEpoch());
-        Assert.assertTrue(leader.waitingForNewEpoch);
-        try {
-            // Observer asks for epoch (mocking LearnerHandler behavior)
-            leader.getEpochToPropose(observerId, lastAcceptedEpoch);
-        } catch (InterruptedException e) {
-            // ignore timeout
-        }
-
-        Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size());
-        Assert.assertEquals("Leader shouldn't set new epoch after observer asks for epoch",
-                lastAcceptedEpoch, peer.getAcceptedEpoch());
-        Assert.assertTrue(leader.waitingForNewEpoch);
-        try {
-            // Now participant asks for epoch (mocking LearnerHandler behavior). Second add to connectingFollowers.
-            // Triggers verifier.containsQuorum = true
-            leader.getEpochToPropose(participantId, lastAcceptedEpoch);
-        } catch (Exception e) {
-            Assert.fail("Timed out in getEpochToPropose");
-        }
-
-        Assert.assertEquals("Unexpected vote in connectingFollowers", 2, leader.connectingFollowers.size());
-        Assert.assertEquals("Leader should record next epoch", lastAcceptedEpoch + 1, peer.getAcceptedEpoch());
-        Assert.assertFalse(leader.waitingForNewEpoch);
-    }
-
-    @Test
-    public void testWaitForEpochAck() throws Exception {
-        // things needed for waitForEpochAck to run (usually in leader.lead(), but we're not running leader here)
-        leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());
-
-        Assert.assertEquals("Unexpected vote in electingFollowers", 0, leader.electingFollowers.size());
-        Assert.assertFalse(leader.electionFinished);
-        try {
-            // leader calls waitForEpochAck, first add to electingFollowers
-            leader.waitForEpochAck(peer.getId(), new StateSummary(0, 0));
-        }  catch (InterruptedException e) {
-            // ignore timeout
-        }
-
-        Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size());
-        Assert.assertFalse(leader.electionFinished);
-        try {
-            // observer calls waitForEpochAck, should fail verifier.containsQuorum
-            leader.waitForEpochAck(observerId, new StateSummary(0, 0));
-        }  catch (InterruptedException e) {
-            // ignore timeout
-        }
-
-        Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size());
-        Assert.assertFalse(leader.electionFinished);
-        try {
-            // second add to electingFollowers, verifier.containsQuorum=true, waitForEpochAck returns without exceptions
-            leader.waitForEpochAck(participantId, new StateSummary(0, 0));
-            Assert.assertEquals("Unexpected vote in electingFollowers", 2, leader.electingFollowers.size());
-            Assert.assertTrue(leader.electionFinished);
-        } catch (Exception e) {
-            Assert.fail("Timed out in waitForEpochAck");
-        }
-    }
-
-    @Test
-    public void testWaitForNewLeaderAck() throws Exception {
-        long zxid = leader.zk.getZxid();
-
-        // things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're not running leader here)
-        leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null, null);
-        leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());
-
-        Set<Long> ackSet = leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();
-        Assert.assertEquals("Unexpected vote in ackSet", 0, ackSet.size());
-        Assert.assertFalse(leader.quorumFormed);
-        try {
-            // leader calls waitForNewLeaderAck, first add to ackSet
-            leader.waitForNewLeaderAck(peer.getId(), zxid);
-        }  catch (InterruptedException e) {
-            // ignore timeout
-        }
-
-        Assert.assertEquals("Unexpected vote in ackSet", 1, ackSet.size());
-        Assert.assertFalse(leader.quorumFormed);
-        try {
-            // observer calls waitForNewLeaderAck, should fail verifier.containsQuorum
-            leader.waitForNewLeaderAck(observerId, zxid);
-        }  catch (InterruptedException e) {
-            // ignore timeout
-        }
-
-        Assert.assertEquals("Unexpected vote in ackSet", 1, ackSet.size());
-        Assert.assertFalse(leader.quorumFormed);
-        try {
-            // second add to ackSet, verifier.containsQuorum=true, waitForNewLeaderAck returns without exceptions
-            leader.waitForNewLeaderAck(participantId, zxid);
-            Assert.assertEquals("Unexpected vote in ackSet", 2, ackSet.size());
-            Assert.assertTrue(leader.quorumFormed);
-        } catch (Exception e) {
-            Assert.fail("Timed out in waitForEpochAck");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
deleted file mode 100644
index f238971..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ /dev/null
@@ -1,592 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.net.Socket;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.TxnLogProposalIterator;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.server.util.ZxidUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LearnerHandlerTest extends ZKTestCase {
-    protected static final Logger LOG = LoggerFactory
-            .getLogger(LearnerHandlerTest.class);
-
-    class MockLearnerHandler extends LearnerHandler {
-        boolean threadStarted = false;
-
-        MockLearnerHandler(Socket sock, Leader leader) throws IOException {
-            super(sock, new BufferedInputStream(sock.getInputStream()), leader);
-        }
-
-        protected void startSendingPackets() {
-            threadStarted = true;
-        }
-    }
-
-    class MockZKDatabase extends ZKDatabase {
-        long lastProcessedZxid;
-        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-        LinkedList<Proposal> committedLog = new LinkedList<Leader.Proposal>();
-        LinkedList<Proposal> txnLog = new LinkedList<Leader.Proposal>();
-
-        public MockZKDatabase(FileTxnSnapLog snapLog) {
-            super(snapLog);
-        }
-
-        public long getDataTreeLastProcessedZxid() {
-            return lastProcessedZxid;
-        }
-
-        public long getmaxCommittedLog() {
-            if (!committedLog.isEmpty()) {
-                return committedLog.getLast().packet.getZxid();
-            }
-            return 0;
-        }
-
-        public long getminCommittedLog() {
-            if (!committedLog.isEmpty()) {
-                return committedLog.getFirst().packet.getZxid();
-            }
-            return 0;
-        }
-
-        public List<Proposal> getCommittedLog() {
-            return committedLog;
-        }
-
-        public ReentrantReadWriteLock getLogLock() {
-            return lock;
-        }
-
-        public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid,
-                long limit) {
-            if (peerZxid >= txnLog.peekFirst().packet.getZxid()) {
-                return txnLog.iterator();
-            } else {
-                return (new LinkedList<Proposal>()).iterator();
-            }
-
-        }
-
-        public long calculateTxnLogSizeLimit() {
-            return 1;
-        }
-    }
-
-    private MockLearnerHandler learnerHandler;
-    private Socket sock;
-
-    // Member variables for mocking Leader
-    private Leader leader;
-    private long currentZxid;
-
-    // Member variables for mocking ZkDatabase
-    private MockZKDatabase db;
-
-    @Before
-    public void setUp() throws Exception {
-        // Intercept when startForwarding is called
-        leader = mock(Leader.class);
-        when(
-                leader.startForwarding(Matchers.any(LearnerHandler.class),
-                        Matchers.anyLong())).thenAnswer(new Answer() {
-            public Object answer(InvocationOnMock invocation) {
-                currentZxid = (Long) invocation.getArguments()[1];
-                return 0;
-            }
-        });
-
-        sock = mock(Socket.class);
-
-        db = new MockZKDatabase(null);
-        learnerHandler = new MockLearnerHandler(sock, leader);
-    }
-
-    Proposal createProposal(long zxid) {
-        Proposal p = new Proposal();
-        p.packet = new QuorumPacket();
-        p.packet.setZxid(zxid);
-        p.packet.setType(Leader.PROPOSAL);
-        return p;
-    }
-
-    /**
-     * Validate that queued packets contains proposal in the following orders as
-     * a given array of zxids
-     *
-     * @param zxids
-     */
-    public void queuedPacketMatches(long[] zxids) {
-        int index = 0;
-        for (QuorumPacket qp : learnerHandler.getQueuedPackets()) {
-            if (qp.getType() == Leader.PROPOSAL) {
-                assertZxidEquals(zxids[index++], qp.getZxid());
-            }
-        }
-    }
-
-    void reset() {
-        learnerHandler.getQueuedPackets().clear();
-        learnerHandler.threadStarted = false;
-        learnerHandler.setFirstPacket(true);
-    }
-
-    /**
-     * Check if op packet (first packet in the queue) match the expected value
-     * @param type - type of packet
-     * @param zxid - zxid in the op packet
-     * @param currentZxid - last packet queued by syncFollower,
-     *                      before invoking startForwarding()
-     */
-    public void assertOpType(int type, long zxid, long currentZxid) {
-        Queue<QuorumPacket> packets = learnerHandler.getQueuedPackets();
-        assertTrue(packets.size() > 0);
-        assertEquals(type, packets.peek().getType());
-        assertZxidEquals(zxid, packets.peek().getZxid());
-        assertZxidEquals(currentZxid, this.currentZxid);
-    }
-
-    void assertZxidEquals(long expected, long value) {
-        assertEquals("Expected 0x" + Long.toHexString(expected) + " but was 0x"
-                + Long.toHexString(value), expected, value);
-    }
-
-    /**
-     * Test cases when leader has empty commitedLog
-     */
-    @Test
-    public void testEmptyCommittedLog() throws Exception {
-        long peerZxid;
-
-        // Peer has newer zxid
-        peerZxid = 3;
-        db.lastProcessedZxid = 1;
-        db.committedLog.clear();
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send TRUNC and forward any packet starting lastProcessedZxid
-        assertOpType(Leader.TRUNC, db.lastProcessedZxid, db.lastProcessedZxid);
-        reset();
-
-        // Peer is already sync
-        peerZxid = 1;
-        db.lastProcessedZxid = 1;
-        db.committedLog.clear();
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF and forward any packet starting lastProcessedZxid
-        assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
-        assertEquals(1, learnerHandler.getQueuedPackets().size());
-        reset();
-
-        // Peer has 0 zxid (new machine turn up), txnlog
-        // is disabled
-        peerZxid = 0;
-        db.setSnapshotSizeFactor(-1);
-        db.lastProcessedZxid = 1;
-        db.committedLog.clear();
-        // We send SNAP
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
-        assertEquals(0, learnerHandler.getQueuedPackets().size());
-        reset();
-
-    }
-
-    /**
-     * Test cases when leader has committedLog
-     */
-    @Test
-    public void testCommittedLog() throws Exception {
-        long peerZxid;
-
-        // Commit proposal may lag behind data tree, but it shouldn't affect
-        // us in any case
-        db.lastProcessedZxid = 6;
-        db.committedLog.add(createProposal(2));
-        db.committedLog.add(createProposal(3));
-        db.committedLog.add(createProposal(5));
-
-        // Peer has zxid that we have never seen
-        peerZxid = 4;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send TRUNC to 3 and forward any packet starting 5
-        assertOpType(Leader.TRUNC, 3, 5);
-        // DIFF + 1 proposals + 1 commit
-        assertEquals(3, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { 5 });
-        reset();
-
-        // Peer is within committedLog range
-        peerZxid = 2;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF and forward any packet starting lastProcessedZxid
-        assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
-                db.getmaxCommittedLog());
-        // DIFF + 2 proposals + 2 commit
-        assertEquals(5, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { 3, 5 });
-        reset();
-
-        // Peer miss the committedLog and txnlog is disabled
-        peerZxid = 1;
-        db.setSnapshotSizeFactor(-1);
-        // We send SNAP
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
-        assertEquals(0, learnerHandler.getQueuedPackets().size());
-        reset();
-    }
-
-    /**
-     * Test cases when txnlog is enabled
-     */
-    @Test
-    public void testTxnLog() throws Exception {
-        long peerZxid;
-        db.txnLog.add(createProposal(2));
-        db.txnLog.add(createProposal(3));
-        db.txnLog.add(createProposal(5));
-        db.txnLog.add(createProposal(6));
-        db.txnLog.add(createProposal(7));
-        db.txnLog.add(createProposal(8));
-        db.txnLog.add(createProposal(9));
-
-        db.lastProcessedZxid = 9;
-        db.committedLog.add(createProposal(6));
-        db.committedLog.add(createProposal(7));
-        db.committedLog.add(createProposal(8));
-
-        // Peer has zxid that we have never seen
-        peerZxid = 4;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
-        assertOpType(Leader.TRUNC, 3, db.getmaxCommittedLog());
-        // DIFF + 4 proposals + 4 commit
-        assertEquals(9, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { 5, 6, 7, 8 });
-        reset();
-
-        // Peer zxid is in txnlog range
-        peerZxid = 3;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF and forward any packet starting at maxCommittedLog
-        assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
-                db.getmaxCommittedLog());
-        // DIFF + 4 proposals + 4 commit
-        assertEquals(9, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { 5, 6, 7, 8 });
-        reset();
-
-    }
-
-    /**
-     * Test case verifying TxnLogProposalIterator closure.
-     */
-    @Test
-    public void testTxnLogProposalIteratorClosure() throws Exception {
-        long peerZxid;
-
-        // CommmitedLog is empty, we will use txnlog up to lastProcessZxid
-        db = new MockZKDatabase(null) {
-            @Override
-            public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid,
-                    long limit) {
-                return TxnLogProposalIterator.EMPTY_ITERATOR;
-            }
-        };
-        db.lastProcessedZxid = 7;
-        db.txnLog.add(createProposal(2));
-        db.txnLog.add(createProposal(3));
-
-        // Peer zxid
-        peerZxid = 4;
-        assertTrue("Couldn't identify snapshot transfer!",
-                learnerHandler.syncFollower(peerZxid, db, leader));
-        reset();
-    }
-
-    /**
-     * Test cases when txnlog is enabled and commitedLog is empty
-     */
-    @Test
-    public void testTxnLogOnly() throws Exception {
-        long peerZxid;
-
-        // CommmitedLog is empty, we will use txnlog up to lastProcessZxid
-        db.lastProcessedZxid = 7;
-        db.txnLog.add(createProposal(2));
-        db.txnLog.add(createProposal(3));
-        db.txnLog.add(createProposal(5));
-        db.txnLog.add(createProposal(6));
-        db.txnLog.add(createProposal(7));
-        db.txnLog.add(createProposal(8));
-
-        // Peer has zxid that we have never seen
-        peerZxid = 4;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send TRUNC to 3 and forward any packet starting at
-        // lastProcessedZxid
-        assertOpType(Leader.TRUNC, 3, db.lastProcessedZxid);
-        // DIFF + 3 proposals + 3 commit
-        assertEquals(7, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { 5, 6, 7 });
-        reset();
-
-        // Peer has zxid in txnlog range
-        peerZxid = 2;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF and forward any packet starting at lastProcessedZxid
-        assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
-        // DIFF + 4 proposals + 4 commit
-        assertEquals(9, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { 3, 5, 6, 7 });
-        reset();
-
-        // Peer miss the txnlog
-        peerZxid = 1;
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send snap
-        assertEquals(0, learnerHandler.getQueuedPackets().size());
-        reset();
-    }
-
-    long getZxid(long epoch, long counter){
-        return ZxidUtils.makeZxid(epoch, counter);
-    }
-
-    /**
-     * Test cases with zxids that are negative long
-     */
-    @Test
-    public void testTxnLogWithNegativeZxid() throws Exception {
-        long peerZxid;
-        db.txnLog.add(createProposal(getZxid(0xf, 2)));
-        db.txnLog.add(createProposal(getZxid(0xf, 3)));
-        db.txnLog.add(createProposal(getZxid(0xf, 5)));
-        db.txnLog.add(createProposal(getZxid(0xf, 6)));
-        db.txnLog.add(createProposal(getZxid(0xf, 7)));
-        db.txnLog.add(createProposal(getZxid(0xf, 8)));
-        db.txnLog.add(createProposal(getZxid(0xf, 9)));
-
-        db.lastProcessedZxid = getZxid(0xf, 9);
-        db.committedLog.add(createProposal(getZxid(0xf, 6)));
-        db.committedLog.add(createProposal(getZxid(0xf, 7)));
-        db.committedLog.add(createProposal(getZxid(0xf, 8)));
-
-        // Peer has zxid that we have never seen
-        peerZxid = getZxid(0xf, 4);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
-        assertOpType(Leader.TRUNC, getZxid(0xf, 3), db.getmaxCommittedLog());
-        // DIFF + 4 proposals + 4 commit
-        assertEquals(9, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { getZxid(0xf, 5),
-                getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8) });
-        reset();
-
-        // Peer zxid is in txnlog range
-        peerZxid = getZxid(0xf, 3);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF and forward any packet starting at maxCommittedLog
-        assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
-                db.getmaxCommittedLog());
-        // DIFF + 4 proposals + 4 commit
-        assertEquals(9, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { getZxid(0xf, 5),
-                getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8) });
-        reset();
-    }
-
-    /**
-     * Test cases when peer has new-epoch zxid
-     */
-    @Test
-    public void testNewEpochZxid() throws Exception {
-        long peerZxid;
-        db.txnLog.add(createProposal(getZxid(0, 1)));
-        db.txnLog.add(createProposal(getZxid(1, 1)));
-        db.txnLog.add(createProposal(getZxid(1, 2)));
-
-        // After leader election, lastProcessedZxid will point to new epoch
-        db.lastProcessedZxid = getZxid(2, 0);
-        db.committedLog.add(createProposal(getZxid(1, 1)));
-        db.committedLog.add(createProposal(getZxid(1, 2)));
-
-        // Peer has zxid of epoch 0
-        peerZxid = getZxid(0, 0);
-        // We should get snap, we can do better here, but the main logic is
-        // that we should never send diff if we have never seen any txn older
-        // than peer zxid
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
-        assertEquals(0, learnerHandler.getQueuedPackets().size());
-        reset();
-
-        // Peer has zxid of epoch 1
-        peerZxid = getZxid(1, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
-        assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
-        // DIFF + 2 proposals + 2 commit
-        assertEquals(5, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { getZxid(1, 1), getZxid(1, 2)});
-        reset();
-
-        // Peer has zxid of epoch 2, so it is already sync
-        peerZxid = getZxid(2, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF to (2, 0) and forward any packet starting at (2, 0)
-        assertOpType(Leader.DIFF, getZxid(2, 0), getZxid(2, 0));
-        // DIFF only
-        assertEquals(1, learnerHandler.getQueuedPackets().size());
-        reset();
-
-    }
-
-    /**
-     * Test cases when learner has new-epcoh zxid
-     * (zxid & 0xffffffffL) == 0;
-     */
-    @Test
-    public void testNewEpochZxidWithTxnlogOnly() throws Exception {
-        long peerZxid;
-        db.txnLog.add(createProposal(getZxid(1, 1)));
-        db.txnLog.add(createProposal(getZxid(2, 1)));
-        db.txnLog.add(createProposal(getZxid(2, 2)));
-        db.txnLog.add(createProposal(getZxid(4, 1)));
-
-        // After leader election, lastProcessedZxid will point to new epoch
-        db.lastProcessedZxid = getZxid(6, 0);
-
-        // Peer has zxid of epoch 3
-        peerZxid = getZxid(3, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF to (6,0) and forward any packet starting at (4,1)
-        assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
-        // DIFF + 1 proposals + 1 commit
-        assertEquals(3, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { getZxid(4, 1)});
-        reset();
-
-        // Peer has zxid of epoch 4
-        peerZxid = getZxid(4, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF to (6,0) and forward any packet starting at (4,1)
-        assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
-        // DIFF + 1 proposals + 1 commit
-        assertEquals(3, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { getZxid(4, 1)});
-        reset();
-
-        // Peer has zxid of epoch 5
-        peerZxid = getZxid(5, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF to (6,0) and forward any packet starting at (5,0)
-        assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(5, 0));
-        // DIFF only
-        assertEquals(1, learnerHandler.getQueuedPackets().size());
-        reset();
-
-        // Peer has zxid of epoch 6
-        peerZxid = getZxid(6, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF to (6,0) and forward any packet starting at (6, 0)
-        assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(6, 0));
-        // DIFF only
-        assertEquals(1, learnerHandler.getQueuedPackets().size());
-        reset();
-    }
-
-    /**
-     * Test cases when there is a duplicate txn in the committedLog. This
-     * should never happen unless there is a bug in initialization code
-     * but the learner should never see duplicate packets
-     */
-    @Test
-    public void testDuplicatedTxn() throws Exception {
-        long peerZxid;
-        db.txnLog.add(createProposal(getZxid(0, 1)));
-        db.txnLog.add(createProposal(getZxid(1, 1)));
-        db.txnLog.add(createProposal(getZxid(1, 2)));
-        db.txnLog.add(createProposal(getZxid(1, 1)));
-        db.txnLog.add(createProposal(getZxid(1, 2)));
-
-        // After leader election, lastProcessedZxid will point to new epoch
-        db.lastProcessedZxid = getZxid(2, 0);
-        db.committedLog.add(createProposal(getZxid(1, 1)));
-        db.committedLog.add(createProposal(getZxid(1, 2)));
-        db.committedLog.add(createProposal(getZxid(1, 1)));
-        db.committedLog.add(createProposal(getZxid(1, 2)));
-
-        // Peer has zxid of epoch 1
-        peerZxid = getZxid(1, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
-        // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
-        assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
-        // DIFF + 2 proposals + 2 commit
-        assertEquals(5, learnerHandler.getQueuedPackets().size());
-        queuedPacketMatches(new long[] { getZxid(1, 1), getZxid(1, 2)});
-        reset();
-
-    }
-
-    /**
-     * Test cases when we have to TRUNC learner, but it may cross epoch boundary
-     * so we need to send snap instead
-     */
-    @Test
-    public void testCrossEpochTrunc() throws Exception {
-        long peerZxid;
-        db.txnLog.add(createProposal(getZxid(1, 1)));
-        db.txnLog.add(createProposal(getZxid(2, 1)));
-        db.txnLog.add(createProposal(getZxid(2, 2)));
-        db.txnLog.add(createProposal(getZxid(4, 1)));
-
-        // After leader election, lastProcessedZxid will point to new epoch
-        db.lastProcessedZxid = getZxid(6, 0);
-
-        // Peer has zxid (3, 1)
-        peerZxid = getZxid(3, 1);
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
-        assertEquals(0, learnerHandler.getQueuedPackets().size());
-        reset();
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
deleted file mode 100644
index c2d65e3..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LearnerSnapshotThrottlerTest extends ZKTestCase {
-    private static final Logger LOG =
-            LoggerFactory.getLogger(LearnerSnapshotThrottlerTest.class);
-
-    @Test(expected = SnapshotThrottleException.class)
-    public void testTooManySnapshotsNonessential() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
-        for (int i = 0; i < 6; i++) {
-            throttler.beginSnapshot(false);
-        }
-    }
-
-    @Test(expected = SnapshotThrottleException.class)
-    public void testTooManySnapshotsEssential() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
-        try {
-            for (int i = 0; i < 6; i++) {
-                throttler.beginSnapshot(true);
-            }
-        }
-        catch (SnapshotThrottleException ex) {
-            Assert.fail("essential snapshots should not be throttled");
-        }
-        throttler.endSnapshot();
-        throttler.beginSnapshot(false);
-    }
-
-    @Test
-    public void testNoThrottle() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
-        try {
-            for (int i = 0; i < 6; i++) {
-                throttler.beginSnapshot(true);
-            }
-        }
-        catch (SnapshotThrottleException ex) {
-            Assert.fail("essential snapshots should not be throttled");
-        }
-        throttler.endSnapshot();
-        for (int i = 0; i < 5; i++) {
-            throttler.endSnapshot();
-            throttler.beginSnapshot(false);
-        }
-    }
-
-    @Test
-    public void testTryWithResourceNoThrottle() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
-        for (int i = 0; i < 3; i++) {
-            LearnerSnapshot snapshot = throttler.beginSnapshot(false);
-            try {
-                Assert.assertFalse(snapshot.isEssential());
-                Assert.assertEquals(1, snapshot.getConcurrentSnapshotNumber());
-            } finally {
-                snapshot.close();
-            }
-        }
-    }
-
-    @Test(expected = SnapshotThrottleException.class)
-    public void testTryWithResourceThrottle() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
-        LearnerSnapshot outer = throttler.beginSnapshot(true);
-        try {
-            LearnerSnapshot inner = throttler.beginSnapshot(false);
-            try {
-                Assert.fail("shouldn't be able to have both snapshots open");
-            } finally {
-                inner.close();
-            }
-        } finally {
-            outer.close();
-        }
-    }
-
-    @Test
-    public void testParallelNoThrottle() throws Exception {
-        final int numThreads = 50;
-
-        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(numThreads);
-        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
-        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
-        final CountDownLatch snapshotProgressLatch = new CountDownLatch(numThreads);
-
-        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            results.add(threadPool.submit(new Callable<Boolean>() {
-
-                @Override
-                public Boolean call() {
-                    threadStartLatch.countDown();
-                    try {
-                        threadStartLatch.await();
-
-                        throttler.beginSnapshot(false);
-
-                        snapshotProgressLatch.countDown();
-                        snapshotProgressLatch.await();
-
-                        throttler.endSnapshot();
-                    }
-                    catch (Exception e) {
-                        return false;
-                    }
-
-                    return true;
-                }
-            }));
-        }
-
-        for (Future<Boolean> result : results) {
-            Assert.assertTrue(result.get());
-        }
-    }
-
-    @Test
-    public void testPositiveTimeout() throws Exception {
-        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1, 200);
-        ExecutorService threadPool = Executors.newFixedThreadPool(1);
-
-        LearnerSnapshot first = throttler.beginSnapshot(false);
-        final CountDownLatch snapshotProgressLatch = new CountDownLatch(1);
-
-        Future<Boolean> result = threadPool.submit(new Callable<Boolean>() {
-            @Override
-            public Boolean call() {
-                try {
-                    snapshotProgressLatch.countDown();
-                    LearnerSnapshot second = throttler.beginSnapshot(false);
-                    second.close();
-                }
-                catch (Exception e) {
-                    return false;
-                }
-
-                return true;
-            }
-        });
-
-        snapshotProgressLatch.await();
-
-        first.close();
-
-        Assert.assertTrue(result.get());
-    }
-
-    @Test
-    public void testHighContentionWithTimeout() throws Exception {
-        int numThreads = 20;
-
-        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(2, 5000);
-        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
-        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
-
-        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            results.add(threadPool.submit(new Callable<Boolean>() {
-
-                @Override
-                public Boolean call() {
-                    threadStartLatch.countDown();
-                    try {
-                        threadStartLatch.await();
-
-                        LearnerSnapshot snap = throttler.beginSnapshot(false);
-
-                        int snapshotNumber = snap.getConcurrentSnapshotNumber();
-
-                        throttler.endSnapshot();
-
-                        return snapshotNumber <= 2;
-                    }
-                    catch (Exception e) {
-                        LOG.error("Exception trying to begin snapshot", e);
-                        return false;
-                    }
-                }
-            }));
-        }
-
-        for (Future<Boolean> result : results) {
-            Assert.assertTrue(result.get());
-        }
-    }
-}


Mime
View raw message