http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java b/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
deleted file mode 100644
index 9546c25..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server.quorum;
-
-import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.apache.zookeeper.test.ClientBase;
-import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.security.sasl.SaslException;
-
-public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
- private static int SERVER_COUNT = 3;
- private MainThread[] mt = new MainThread[SERVER_COUNT];
-
- /**
- * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2355.
- * ZooKeeper ephemeral node is never deleted if follower fail while reading
- * the proposal packet.
- */
-
- @Test(timeout = 120000)
- public void testEphemeralNodeDeletion() throws Exception {
- final int clientPorts[] = new int[SERVER_COUNT];
- StringBuilder sb = new StringBuilder();
- String server;
-
- for (int i = 0; i < SERVER_COUNT; i++) {
- clientPorts[i] = PortAssignment.unique();
- server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
- + clientPorts[i];
- sb.append(server + "\n");
- }
- String currentQuorumCfgSection = sb.toString();
- // start all the servers
- for (int i = 0; i < SERVER_COUNT; i++) {
- mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
- false) {
- @Override
- public TestQPMain getTestQPMain() {
- return new MockTestQPMain();
- }
- };
- mt[i].start();
- }
-
- // ensure all servers started
- for (int i = 0; i < SERVER_COUNT; i++) {
- Assert.assertTrue("waiting for server " + i + " being up",
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
- CONNECTION_TIMEOUT));
- }
-
- CountdownWatcher watch = new CountdownWatcher();
- ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[1],
- ClientBase.CONNECTION_TIMEOUT, watch);
- watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
-
- /**
- * now the problem scenario starts
- */
-
- Stat firstEphemeralNode = new Stat();
-
- // 1: create ephemeral node
- String nodePath = "/e1";
- zk.create(nodePath, "1".getBytes(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL, firstEphemeralNode);
- assertEquals("Current session and ephemeral owner should be same",
- zk.getSessionId(), firstEphemeralNode.getEphemeralOwner());
-
- // 2: inject network problem in one of the follower
- CustomQuorumPeer follower = (CustomQuorumPeer) getByServerState(mt,
- ServerState.FOLLOWING);
- follower.setInjectError(true);
-
- // 3: close the session so that ephemeral node is deleted
- zk.close();
-
- // remove the error
- follower.setInjectError(false);
-
- Assert.assertTrue("Faulted Follower should have joined quorum by now",
- ClientBase.waitForServerUp(
- "127.0.0.1:" + follower.getClientPort(),
- CONNECTION_TIMEOUT));
-
- QuorumPeer leader = getByServerState(mt, ServerState.LEADING);
- assertNotNull("Leader should not be null", leader);
- Assert.assertTrue("Leader must be running", ClientBase.waitForServerUp(
- "127.0.0.1:" + leader.getClientPort(), CONNECTION_TIMEOUT));
-
- watch = new CountdownWatcher();
- zk = new ZooKeeper("127.0.0.1:" + leader.getClientPort(),
- ClientBase.CONNECTION_TIMEOUT, watch);
- watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
-
- Stat exists = zk.exists(nodePath, false);
- assertNull("Node must have been deleted from leader", exists);
-
- CountdownWatcher followerWatch = new CountdownWatcher();
- ZooKeeper followerZK = new ZooKeeper(
- "127.0.0.1:" + follower.getClientPort(),
- ClientBase.CONNECTION_TIMEOUT, followerWatch);
- followerWatch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
- Stat nodeAtFollower = followerZK.exists(nodePath, false);
-
- // Problem 1: Follower had one extra ephemeral node /e1
- assertNull("ephemeral node must not exist", nodeAtFollower);
-
- // Create the node with another session
- Stat currentEphemeralNode = new Stat();
- zk.create(nodePath, "2".getBytes(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL, currentEphemeralNode);
-
- // close the session and newly created ephemeral node should be deleted
- zk.close();
-
- SyncCallback cb = new SyncCallback();
- followerZK.sync(nodePath, cb, null);
- cb.sync.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-
- nodeAtFollower = followerZK.exists(nodePath, false);
-
- // Problem 2: Before fix, after session close the ephemeral node
- // was not getting deleted. But now after the fix after session close
- // ephemeral node is getting deleted.
- assertNull("After session close ephemeral node must be deleted",
- nodeAtFollower);
- followerZK.close();
- }
-
- @After
- public void tearDown() {
- // stop all severs
- for (int i = 0; i < mt.length; i++) {
- try {
- mt[i].shutdown();
- } catch (InterruptedException e) {
- LOG.warn("Quorum Peer interrupted while shutting it down", e);
- }
- }
- }
-
- private QuorumPeer getByServerState(MainThread[] mt, ServerState state) {
- for (int i = mt.length - 1; i >= 0; i--) {
- QuorumPeer quorumPeer = mt[i].getQuorumPeer();
- if (null != quorumPeer && state == quorumPeer.getPeerState()) {
- return quorumPeer;
- }
- }
- return null;
- }
-
- static class CustomQuorumPeer extends QuorumPeer {
- private boolean injectError = false;
-
- public CustomQuorumPeer() throws SaslException {
-
- }
-
- @Override
- protected Follower makeFollower(FileTxnSnapLog logFactory)
- throws IOException {
- return new Follower(this, new FollowerZooKeeperServer(logFactory,
- this, this.getZkDb())) {
-
- @Override
- void readPacket(QuorumPacket pp) throws IOException {
- /**
- * In real scenario got SocketTimeoutException while reading
- * the packet from leader because of network problem, but
- * here throwing SocketTimeoutException based on whether
- * error is injected or not
- */
- super.readPacket(pp);
- if (injectError && pp.getType() == Leader.PROPOSAL) {
- String type = LearnerHandler.packetToString(pp);
- throw new SocketTimeoutException(
- "Socket timeout while reading the packet for operation "
- + type);
- }
- }
-
- };
- }
-
- public void setInjectError(boolean injectError) {
- this.injectError = injectError;
- }
-
- }
-
- static class MockTestQPMain extends TestQPMain {
- @Override
- protected QuorumPeer getQuorumPeer() throws SaslException {
- return new CustomQuorumPeer();
- }
- }
-
- private static class SyncCallback implements AsyncCallback.VoidCallback {
- private final CountDownLatch sync = new CountDownLatch(1);
-
- @Override
- public void processResult(int rc, String path, Object ctx) {
- sync.countDown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
deleted file mode 100644
index f50d0bb..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.quorum.QuorumCnxManager;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.Vote;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FLEBackwardElectionRoundTest extends ZKTestCase {
- protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
-
- int count;
- Map<Long,QuorumServer> peers;
- File tmpdir[];
- int port[];
-
- QuorumCnxManager cnxManagers[];
-
- @Before
- public void setUp() throws Exception {
- count = 3;
-
- peers = new HashMap<Long,QuorumServer>(count);
- tmpdir = new File[count];
- port = new int[count];
- cnxManagers = new QuorumCnxManager[count - 1];
- }
-
- @After
- public void tearDown() throws Exception {
- for(int i = 0; i < (count - 1); i++){
- if(cnxManagers[i] != null){
- cnxManagers[i].halt();
- }
- }
- }
-
- /**
- * This test is checking the following case. A server S is
- * currently LOOKING and it receives notifications from
- * a quorum indicating they are following S. The election
- * round E of S is higher than the election round E' in the
- * notification messages, so S becomes the leader and sets
- * its epoch back to E'. In the meanwhile, one or more
- * followers turn to LOOKING and elect S in election round E.
- * Having leader and followers with different election rounds
- * might prevent other servers from electing a leader because
- * they can't get a consistent set of notifications from a
- * quorum.
- *
- * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1514}
- *
- *
- * @throws Exception
- */
-
- @Test
- public void testBackwardElectionRound() throws Exception {
- LOG.info("TestLE: {}, {}", getTestName(), count);
- for(int i = 0; i < count; i++) {
- int clientport = PortAssignment.unique();
- peers.put(Long.valueOf(i),
- new QuorumServer(i,
- new InetSocketAddress(clientport),
- new InetSocketAddress(PortAssignment.unique())));
- tmpdir[i] = ClientBase.createTmpDir();
- port[i] = clientport;
- }
-
- ByteBuffer initialMsg0 = getMsg();
- ByteBuffer initialMsg1 = getMsg();
-
- /*
- * Start server 0
- */
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
- peer.startLeaderElection();
- FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
- thread.start();
-
- /*
- * Start mock server 1
- */
- QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
- cnxManagers[0] = mockPeer.createCnxnManager();
- cnxManagers[0].listener.start();
-
- cnxManagers[0].toSend(0l, initialMsg0);
-
- /*
- * Start mock server 2
- */
- mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
- cnxManagers[1] = mockPeer.createCnxnManager();
- cnxManagers[1].listener.start();
-
- cnxManagers[1].toSend(0l, initialMsg1);
-
- /*
- * Run another instance of leader election.
- */
- thread.join(5000);
- thread = new FLETestUtils.LEThread(peer, 0);
- thread.start();
-
- /*
- * Send the same messages, this time should not make 0 the leader.
- */
- cnxManagers[0].toSend(0l, initialMsg0);
- cnxManagers[1].toSend(0l, initialMsg1);
-
- thread.join(5000);
-
- if (!thread.isAlive()) {
- Assert.fail("Should not have joined");
- }
-
- }
-
- private ByteBuffer getMsg() {
- return FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
deleted file mode 100644
index 6583f90..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.quorum.FastLeaderElection;
-import org.apache.zookeeper.server.quorum.QuorumCnxManager;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FLELostMessageTest extends ZKTestCase {
- protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
-
- int count;
- HashMap<Long,QuorumServer> peers;
- File tmpdir[];
- int port[];
-
- QuorumCnxManager cnxManager;
-
- @Before
- public void setUp() throws Exception {
- count = 3;
-
- peers = new HashMap<Long,QuorumServer>(count);
- tmpdir = new File[count];
- port = new int[count];
- }
-
- @After
- public void tearDown() throws Exception {
- cnxManager.halt();
- }
-
- @Test
- public void testLostMessage() throws Exception {
- LOG.info("TestLE: {}, {}", getTestName(), count);
- for(int i = 0; i < count; i++) {
- int clientport = PortAssignment.unique();
- peers.put(Long.valueOf(i),
- new QuorumServer(i,
- new InetSocketAddress(clientport),
- new InetSocketAddress(PortAssignment.unique())));
- tmpdir[i] = ClientBase.createTmpDir();
- port[i] = clientport;
- }
-
- /*
- * Start server 0
- */
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
- peer.startLeaderElection();
- FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
- thread.start();
-
- /*
- * Start mock server 1
- */
- mockServer();
- thread.join(5000);
- if (thread.isAlive()) {
- Assert.fail("Threads didn't join");
- }
- }
-
- void mockServer() throws InterruptedException, IOException {
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
- cnxManager = peer.createCnxnManager();
- cnxManager.listener.start();
-
- cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0));
- cnxManager.recvQueue.take();
- cnxManager.toSend(1L, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 0));
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java b/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
deleted file mode 100644
index a907abd..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.nio.ByteBuffer;
-
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.quorum.FastLeaderElection;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.Vote;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Assert;
-
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-
-public class FLETestUtils extends ZKTestCase {
- protected static final Logger LOG = LoggerFactory.getLogger(FLETestUtils.class);
-
- /*
- * Thread to run an instance of leader election for
- * a given quorum peer.
- */
- static class LEThread extends Thread {
- private int i;
- private QuorumPeer peer;
-
- LEThread(QuorumPeer peer, int i) {
- this.i = i;
- this.peer = peer;
- LOG.info("Constructor: {}", getName());
-
- }
-
- public void run() {
- try {
- Vote v = null;
- peer.setPeerState(ServerState.LOOKING);
- LOG.info("Going to call leader election: {}", i);
- v = peer.getElectionAlg().lookForLeader();
-
- if (v == null) {
- Assert.fail("Thread " + i + " got a null vote");
- }
-
- /*
- * A real zookeeper would take care of setting the current vote. Here
- * we do it manually.
- */
- peer.setCurrentVote(v);
-
- LOG.info("Finished election: {}, {}", i, v.getId());
-
- Assert.assertTrue("State is not leading.", peer.getPeerState() == ServerState.LEADING);
- } catch (Exception e) {
- e.printStackTrace();
- }
- LOG.info("Joining");
- }
- }
-
- /*
- * Creates a leader election notification message.
- */
- static ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
- return FastLeaderElection.buildMsg(state, leader, zxid, 1, epoch);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
deleted file mode 100644
index c4ca8a8..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.security.sasl.SaslException;
-
-import org.apache.jute.OutputArchive;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.Op;
-
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.DataNode;
-import org.apache.zookeeper.server.DataTree;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.test.ClientBase;
-
-import org.junit.Assert;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test cases used to catch corner cases due to fuzzy snapshot.
- */
-public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(FuzzySnapshotRelatedTest.class);
-
- MainThread[] mt = null;
- ZooKeeper[] zk = null;
- int[] clientPorts = null;
- int leaderId;
- int followerA;
-
- @Before
- public void setup() throws Exception {
- LOG.info("Start up a 3 server quorum");
- final int ENSEMBLE_SERVERS = 3;
- clientPorts = new int[ENSEMBLE_SERVERS];
- StringBuilder sb = new StringBuilder();
- String server;
-
- for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
- clientPorts[i] = PortAssignment.unique();
- server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
- + clientPorts[i];
- sb.append(server + "\n");
- }
- String currentQuorumCfgSection = sb.toString();
-
- // start servers
- mt = new MainThread[ENSEMBLE_SERVERS];
- zk = new ZooKeeper[ENSEMBLE_SERVERS];
- for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
- mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
- false) {
- @Override
- public TestQPMain getTestQPMain() {
- return new CustomizedQPMain();
- }
- };
- mt[i].start();
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
- ClientBase.CONNECTION_TIMEOUT, this);
- }
- QuorumPeerMainTest.waitForAll(zk, States.CONNECTED);
- LOG.info("all servers started");
-
- leaderId = -1;
- followerA = -1;
- for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
- if (mt[i].main.quorumPeer.leader != null) {
- leaderId = i;
- } else if (followerA == -1) {
- followerA = i;
- }
- }
- }
-
- @After
- public void tearDown() throws Exception {
- if (mt != null) {
- for (MainThread t: mt) {
- t.shutdown();
- }
- }
-
- if (zk != null) {
- for (ZooKeeper z: zk) {
- z.close();
- }
- }
- }
-
- @Test
- public void testMultiOpConsistency() throws Exception {
- LOG.info("Create a parent node");
- final String path = "/testMultiOpConsistency";
- createEmptyNode(zk[followerA], path);
-
- LOG.info("Hook to catch the 2nd sub create node txn in multi-op");
- CustomDataTree dt =
- (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree();
-
- final ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
-
- String node1 = path + "/1";
- String node2 = path + "/2";
-
- dt.addNodeCreateListener(node2, new NodeCreateListener() {
- @Override
- public void process(String path) {
- LOG.info("Take a snapshot");
- zkServer.takeSnapshot(true);
- }
- });
-
- LOG.info("Issue a multi op to create 2 nodes");
- zk[followerA].multi(Arrays.asList(
- Op.create(node1, node1.getBytes(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
- Op.create(node2, node2.getBytes(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT))
- );
-
- LOG.info("Restart the server");
- mt[followerA].shutdown();
- QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
-
- mt[followerA].start();
- QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
-
- LOG.info("Make sure the node consistent with leader");
- Assert.assertEquals(new String(zk[leaderId].getData(node2, null, null)),
- new String(zk[followerA].getData(node2, null, null)));
- }
-
- /**
- * It's possibel during SNAP sync, the parent is serialized before the
- * child get deleted during sending the snapshot over.
- *
- * In which case, we need to make sure the pzxid get correctly updated
- * when applying the txns received.
- */
- @Test
- public void testPZxidUpdatedDuringSnapSyncing() throws Exception {
- LOG.info("Enable force snapshot sync");
- System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
-
- final String parent = "/testPZxidUpdatedWhenDeletingNonExistNode";
- final String child = parent + "/child";
- createEmptyNode(zk[leaderId], parent);
- createEmptyNode(zk[leaderId], child);
-
- LOG.info("shutdown follower {}", followerA);
- mt[followerA].shutdown();
- QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
-
- LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
- addSerializeListener(leaderId, parent, child);
-
- LOG.info("Restart follower A to trigger a SNAP sync with leader");
- mt[followerA].start();
- QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
-
- LOG.info("Check and make sure the pzxid of the parent is the same " +
- "on leader and follower A");
- compareStat(parent, leaderId, followerA);
- }
-
- /**
- * It's possible during taking fuzzy snapshot, the parent is serialized
- * before the child get deleted in the fuzzy range.
- *
- * In which case, we need to make sure the pzxid get correctly updated
- * when replaying the txns.
- */
- @Test
- public void testPZxidUpdatedWhenLoadingSnapshot() throws Exception {
-
- final String parent = "/testPZxidUpdatedDuringTakingSnapshot";
- final String child = parent + "/child";
- createEmptyNode(zk[followerA], parent);
- createEmptyNode(zk[followerA], child);
-
- LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
- addSerializeListener(followerA, parent, child);
-
- LOG.info("Take snapshot on follower A");
- ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
- zkServer.takeSnapshot(true);
-
- LOG.info("Restarting follower A to load snapshot");
- mt[followerA].shutdown();
- mt[followerA].start();
- QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
-
- LOG.info("Check and make sure the pzxid of the parent is the same " +
- "on leader and follower A");
- compareStat(parent, leaderId, followerA);
- }
-
- private void addSerializeListener(int sid, String parent, String child) {
- final ZooKeeper zkClient = zk[followerA];
- CustomDataTree dt =
- (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree();
- dt.addListener(parent, new NodeSerializeListener() {
- @Override
- public void nodeSerialized(String path) {
- try {
- zkClient.delete(child, -1);
- LOG.info("Deleted the child node after the parent is serialized");
- } catch (Exception e) {
- LOG.error("Error when deleting node {}", e);
- }
- }
- });
- }
-
- private void compareStat(String path, int sid, int compareWithSid) throws Exception{
- Stat stat1 = new Stat();
- zk[sid].getData(path, null, stat1);
-
- Stat stat2 = new Stat();
- zk[compareWithSid].getData(path, null, stat2);
-
- Assert.assertEquals(stat1, stat2);
- }
-
- @Test
- public void testGlobalSessionConsistency() throws Exception {
- LOG.info("Hook to catch the commitSession event on followerA");
- CustomizedQPMain followerAMain = (CustomizedQPMain) mt[followerA].main;
- final ZooKeeperServer zkServer = followerAMain.quorumPeer.getActiveServer();
-
- // only take snapshot for the next global session we're going to create
- final AtomicBoolean shouldTakeSnapshot = new AtomicBoolean(true);
- followerAMain.setCommitSessionListener(new CommitSessionListener() {
- @Override
- public void process(long sessionId) {
- LOG.info("Take snapshot");
- if (shouldTakeSnapshot.getAndSet(false)) {
- zkServer.takeSnapshot(true);
- }
- }
- });
-
- LOG.info("Create a global session");
- ZooKeeper globalClient = new ZooKeeper(
- "127.0.0.1:" + clientPorts[followerA],
- ClientBase.CONNECTION_TIMEOUT, this);
- QuorumPeerMainTest.waitForOne(globalClient, States.CONNECTED);
-
- LOG.info("Restart followerA to load the data from disk");
- mt[followerA].shutdown();
- QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
-
- mt[followerA].start();
- QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
-
- LOG.info("Make sure the global sessions are consistent with leader");
-
- Map<Long, Integer> globalSessionsOnLeader =
- mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
- if (mt[followerA].main.quorumPeer == null) {
- LOG.info("quorumPeer is null");
- }
- if (mt[followerA].main.quorumPeer.getZkDb() == null) {
- LOG.info("zkDb is null");
- }
- Map<Long, Integer> globalSessionsOnFollowerA =
- mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
- LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(),
- globalSessionsOnFollowerA.keySet());
- Assert.assertTrue(globalSessionsOnFollowerA.keySet().containsAll(
- globalSessionsOnLeader.keySet()));
- }
-
- private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
- zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
-
- static interface NodeCreateListener {
- public void process(String path);
-
- }
-
- static class CustomDataTree extends DataTree {
- Map<String, NodeCreateListener> nodeCreateListeners =
- new HashMap<String, NodeCreateListener>();
- Map<String, NodeSerializeListener> listeners =
- new HashMap<String, NodeSerializeListener>();
-
- @Override
- public void serializeNodeData(OutputArchive oa, String path,
- DataNode node) throws IOException {
- super.serializeNodeData(oa, path, node);
- NodeSerializeListener listener = listeners.get(path);
- if (listener != null) {
- listener.nodeSerialized(path);
- }
- }
-
- public void addListener(String path, NodeSerializeListener listener) {
- listeners.put(path, listener);
- }
-
- @Override
- public void createNode(final String path, byte data[], List<ACL> acl,
- long ephemeralOwner, int parentCVersion, long zxid,
- long time, Stat outputStat)
- throws NoNodeException, NodeExistsException {
- NodeCreateListener listener = nodeCreateListeners.get(path);
- if (listener != null) {
- listener.process(path);
- }
- super.createNode(path, data, acl, ephemeralOwner, parentCVersion,
- zxid, time, outputStat);
- }
-
- public void addNodeCreateListener(String path, NodeCreateListener listener) {
- nodeCreateListeners.put(path, listener);
- }
- }
-
- static interface NodeSerializeListener {
- public void nodeSerialized(String path);
- }
-
- static interface CommitSessionListener {
- public void process(long sessionId);
- }
-
- static class CustomizedQPMain extends TestQPMain {
- CommitSessionListener commitSessionListener;
-
- public void setCommitSessionListener(CommitSessionListener listener) {
- this.commitSessionListener = listener;
- }
-
- @Override
- protected QuorumPeer getQuorumPeer() throws SaslException {
- return new QuorumPeer() {
- @Override
- public void setZKDatabase(ZKDatabase database) {
- super.setZKDatabase(new ZKDatabase(this.getTxnFactory()) {
- @Override
- public DataTree createDataTree() {
- return new CustomDataTree();
- }
- });
- }
-
- @Override
- protected Follower makeFollower(FileTxnSnapLog logFactory)
- throws IOException {
- return new Follower(this, new FollowerZooKeeperServer(
- logFactory, this, this.getZkDb()) {
- @Override
- public void createSessionTracker() {
- sessionTracker = new LearnerSessionTracker(
- this, getZKDatabase().getSessionWithTimeOuts(),
- this.tickTime, self.getId(),
- self.areLocalSessionsEnabled(),
- getZooKeeperServerListener()) {
-
- public synchronized boolean commitSession(
- long sessionId, int sessionTimeout) {
- if (commitSessionListener != null) {
- commitSessionListener.process(sessionId);
- }
- return super.commitSession(sessionId, sessionTimeout);
- }
- };
- }
- });
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LeaderBeanTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
deleted file mode 100644
index b017551..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.jute.OutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
-import org.apache.zookeeper.server.util.SerializeUtils;
-import org.apache.zookeeper.test.ClientBase;
-import org.apache.zookeeper.txn.TxnHeader;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class LeaderBeanTest {
- private Leader leader;
- private LeaderBean leaderBean;
- private FileTxnSnapLog fileTxnSnapLog;
- private LeaderZooKeeperServer zks;
- private QuorumPeer qp;
-
- @Before
- public void setUp() throws IOException {
- qp = new QuorumPeer();
- QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
- qp.setQuorumVerifier(quorumVerifierMock, false);
- File tmpDir = ClientBase.createEmptyTestDir();
- fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"),
- new File(tmpDir, "data_txnlog"));
- ZKDatabase zkDb = new ZKDatabase(fileTxnSnapLog);
-
- zks = new LeaderZooKeeperServer(fileTxnSnapLog, qp, zkDb);
- leader = new Leader(qp, zks);
- leaderBean = new LeaderBean(leader, zks);
- }
-
- @After
- public void tearDown() throws IOException {
- fileTxnSnapLog.close();
- }
-
- @Test
- public void testGetName() {
- assertEquals("Leader", leaderBean.getName());
- }
-
- @Test
- public void testGetCurrentZxid() {
- // Arrange
- zks.setZxid(1);
-
- // Assert
- assertEquals("0x1", leaderBean.getCurrentZxid());
- }
-
- @Test
- public void testGetElectionTimeTaken() {
- // Arrange
- qp.setElectionTimeTaken(1);
-
- // Assert
- assertEquals(1, leaderBean.getElectionTimeTaken());
- }
-
- @Test
- public void testGetProposalSize() throws IOException, Leader.XidRolloverException {
- // Arrange
- Request req = createMockRequest();
-
- // Act
- leader.propose(req);
-
- // Assert
- byte[] data = SerializeUtils.serializeRequest(req);
- assertEquals(data.length, leaderBean.getLastProposalSize());
- assertEquals(data.length, leaderBean.getMinProposalSize());
- assertEquals(data.length, leaderBean.getMaxProposalSize());
- }
-
- @Test
- public void testResetProposalStats() throws IOException, Leader.XidRolloverException {
- // Arrange
- int initialProposalSize = leaderBean.getLastProposalSize();
- Request req = createMockRequest();
-
- // Act
- leader.propose(req);
-
- // Assert
- assertNotEquals(initialProposalSize, leaderBean.getLastProposalSize());
- leaderBean.resetProposalStatistics();
- assertEquals(initialProposalSize, leaderBean.getLastProposalSize());
- assertEquals(initialProposalSize, leaderBean.getMinProposalSize());
- assertEquals(initialProposalSize, leaderBean.getMaxProposalSize());
- }
-
- private Request createMockRequest() throws IOException {
- TxnHeader header = mock(TxnHeader.class);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- OutputArchive oa = (OutputArchive) args[0];
- oa.writeString("header", "test");
- return null;
- }
- }).when(header).serialize(any(OutputArchive.class), anyString());
- Record txn = mock(Record.class);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- OutputArchive oa = (OutputArchive) args[0];
- oa.writeString("record", "test");
- return null;
- }
- }).when(txn).serialize(any(OutputArchive.class), anyString());
- return new Request(1, 2, 3, header, txn, 4);
- }
-
- @Test
- public void testFollowerInfo() throws IOException {
- LearnerHandler follower = mock(LearnerHandler.class);
- when(follower.getLearnerType()).thenReturn(LearnerType.PARTICIPANT);
- when(follower.toString()).thenReturn("1");
- leader.addLearnerHandler(follower);
-
- assertEquals("1\n", leaderBean.followerInfo());
-
- LearnerHandler observer = mock(LearnerHandler.class);
- when(observer.getLearnerType()).thenReturn(LearnerType.OBSERVER);
- when(observer.toString()).thenReturn("2");
- leader.addLearnerHandler(observer);
-
- assertEquals("1\n", leaderBean.followerInfo());
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
deleted file mode 100644
index 2548aca..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
-import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
-
-public class LeaderWithObserverTest {
-
- QuorumPeer peer;
- Leader leader;
- File tmpDir;
- long participantId;
- long observerId;
-
- @Before
- public void setUp() throws Exception {
- tmpDir = ClientBase.createTmpDir();
- peer = createQuorumPeer(tmpDir);
- participantId = 1;
- Map<Long, QuorumPeer.QuorumServer> peers = peer.getQuorumVerifier().getAllMembers();
- observerId = peers.size();
- leader = createLeader(tmpDir, peer);
- peer.leader = leader;
- peers.put(observerId, new QuorumPeer.QuorumServer(
- observerId, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
- new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
- new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
- QuorumPeer.LearnerType.OBSERVER));
-
- // these tests are serial, we can speed up InterruptedException
- peer.tickTime = 1;
- }
-
- @After
- public void tearDown(){
- leader.shutdown("end of test");
- tmpDir.delete();
- }
-
- @Test
- public void testGetEpochToPropose() throws Exception {
- long lastAcceptedEpoch = 5;
- peer.setAcceptedEpoch(5);
-
- Assert.assertEquals("Unexpected vote in connectingFollowers", 0, leader.connectingFollowers.size());
- Assert.assertTrue(leader.waitingForNewEpoch);
- try {
- // Leader asks for epoch (mocking Leader.lead behavior)
- // First add to connectingFollowers
- leader.getEpochToPropose(peer.getId(), lastAcceptedEpoch);
- } catch (InterruptedException e) {
- // ignore timeout
- }
-
- Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size());
- Assert.assertEquals("Leader shouldn't set new epoch until quorum of participants is in connectingFollowers",
- lastAcceptedEpoch, peer.getAcceptedEpoch());
- Assert.assertTrue(leader.waitingForNewEpoch);
- try {
- // Observer asks for epoch (mocking LearnerHandler behavior)
- leader.getEpochToPropose(observerId, lastAcceptedEpoch);
- } catch (InterruptedException e) {
- // ignore timeout
- }
-
- Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size());
- Assert.assertEquals("Leader shouldn't set new epoch after observer asks for epoch",
- lastAcceptedEpoch, peer.getAcceptedEpoch());
- Assert.assertTrue(leader.waitingForNewEpoch);
- try {
- // Now participant asks for epoch (mocking LearnerHandler behavior). Second add to connectingFollowers.
- // Triggers verifier.containsQuorum = true
- leader.getEpochToPropose(participantId, lastAcceptedEpoch);
- } catch (Exception e) {
- Assert.fail("Timed out in getEpochToPropose");
- }
-
- Assert.assertEquals("Unexpected vote in connectingFollowers", 2, leader.connectingFollowers.size());
- Assert.assertEquals("Leader should record next epoch", lastAcceptedEpoch + 1, peer.getAcceptedEpoch());
- Assert.assertFalse(leader.waitingForNewEpoch);
- }
-
- @Test
- public void testWaitForEpochAck() throws Exception {
- // things needed for waitForEpochAck to run (usually in leader.lead(), but we're not running leader here)
- leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());
-
- Assert.assertEquals("Unexpected vote in electingFollowers", 0, leader.electingFollowers.size());
- Assert.assertFalse(leader.electionFinished);
- try {
- // leader calls waitForEpochAck, first add to electingFollowers
- leader.waitForEpochAck(peer.getId(), new StateSummary(0, 0));
- } catch (InterruptedException e) {
- // ignore timeout
- }
-
- Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size());
- Assert.assertFalse(leader.electionFinished);
- try {
- // observer calls waitForEpochAck, should fail verifier.containsQuorum
- leader.waitForEpochAck(observerId, new StateSummary(0, 0));
- } catch (InterruptedException e) {
- // ignore timeout
- }
-
- Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size());
- Assert.assertFalse(leader.electionFinished);
- try {
- // second add to electingFollowers, verifier.containsQuorum=true, waitForEpochAck returns without exceptions
- leader.waitForEpochAck(participantId, new StateSummary(0, 0));
- Assert.assertEquals("Unexpected vote in electingFollowers", 2, leader.electingFollowers.size());
- Assert.assertTrue(leader.electionFinished);
- } catch (Exception e) {
- Assert.fail("Timed out in waitForEpochAck");
- }
- }
-
- @Test
- public void testWaitForNewLeaderAck() throws Exception {
- long zxid = leader.zk.getZxid();
-
- // things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're not running leader here)
- leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null, null);
- leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());
-
- Set<Long> ackSet = leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();
- Assert.assertEquals("Unexpected vote in ackSet", 0, ackSet.size());
- Assert.assertFalse(leader.quorumFormed);
- try {
- // leader calls waitForNewLeaderAck, first add to ackSet
- leader.waitForNewLeaderAck(peer.getId(), zxid);
- } catch (InterruptedException e) {
- // ignore timeout
- }
-
- Assert.assertEquals("Unexpected vote in ackSet", 1, ackSet.size());
- Assert.assertFalse(leader.quorumFormed);
- try {
- // observer calls waitForNewLeaderAck, should fail verifier.containsQuorum
- leader.waitForNewLeaderAck(observerId, zxid);
- } catch (InterruptedException e) {
- // ignore timeout
- }
-
- Assert.assertEquals("Unexpected vote in ackSet", 1, ackSet.size());
- Assert.assertFalse(leader.quorumFormed);
- try {
- // second add to ackSet, verifier.containsQuorum=true, waitForNewLeaderAck returns without exceptions
- leader.waitForNewLeaderAck(participantId, zxid);
- Assert.assertEquals("Unexpected vote in ackSet", 2, ackSet.size());
- Assert.assertTrue(leader.quorumFormed);
- } catch (Exception e) {
- Assert.fail("Timed out in waitForEpochAck");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
deleted file mode 100644
index f238971..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ /dev/null
@@ -1,592 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.net.Socket;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.TxnLogProposalIterator;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.server.util.ZxidUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LearnerHandlerTest extends ZKTestCase {
- protected static final Logger LOG = LoggerFactory
- .getLogger(LearnerHandlerTest.class);
-
- class MockLearnerHandler extends LearnerHandler {
- boolean threadStarted = false;
-
- MockLearnerHandler(Socket sock, Leader leader) throws IOException {
- super(sock, new BufferedInputStream(sock.getInputStream()), leader);
- }
-
- protected void startSendingPackets() {
- threadStarted = true;
- }
- }
-
- class MockZKDatabase extends ZKDatabase {
- long lastProcessedZxid;
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- LinkedList<Proposal> committedLog = new LinkedList<Leader.Proposal>();
- LinkedList<Proposal> txnLog = new LinkedList<Leader.Proposal>();
-
- public MockZKDatabase(FileTxnSnapLog snapLog) {
- super(snapLog);
- }
-
- public long getDataTreeLastProcessedZxid() {
- return lastProcessedZxid;
- }
-
- public long getmaxCommittedLog() {
- if (!committedLog.isEmpty()) {
- return committedLog.getLast().packet.getZxid();
- }
- return 0;
- }
-
- public long getminCommittedLog() {
- if (!committedLog.isEmpty()) {
- return committedLog.getFirst().packet.getZxid();
- }
- return 0;
- }
-
- public List<Proposal> getCommittedLog() {
- return committedLog;
- }
-
- public ReentrantReadWriteLock getLogLock() {
- return lock;
- }
-
- public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid,
- long limit) {
- if (peerZxid >= txnLog.peekFirst().packet.getZxid()) {
- return txnLog.iterator();
- } else {
- return (new LinkedList<Proposal>()).iterator();
- }
-
- }
-
- public long calculateTxnLogSizeLimit() {
- return 1;
- }
- }
-
- private MockLearnerHandler learnerHandler;
- private Socket sock;
-
- // Member variables for mocking Leader
- private Leader leader;
- private long currentZxid;
-
- // Member variables for mocking ZkDatabase
- private MockZKDatabase db;
-
- @Before
- public void setUp() throws Exception {
- // Intercept when startForwarding is called
- leader = mock(Leader.class);
- when(
- leader.startForwarding(Matchers.any(LearnerHandler.class),
- Matchers.anyLong())).thenAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- currentZxid = (Long) invocation.getArguments()[1];
- return 0;
- }
- });
-
- sock = mock(Socket.class);
-
- db = new MockZKDatabase(null);
- learnerHandler = new MockLearnerHandler(sock, leader);
- }
-
- Proposal createProposal(long zxid) {
- Proposal p = new Proposal();
- p.packet = new QuorumPacket();
- p.packet.setZxid(zxid);
- p.packet.setType(Leader.PROPOSAL);
- return p;
- }
-
- /**
- * Validate that queued packets contains proposal in the following orders as
- * a given array of zxids
- *
- * @param zxids
- */
- public void queuedPacketMatches(long[] zxids) {
- int index = 0;
- for (QuorumPacket qp : learnerHandler.getQueuedPackets()) {
- if (qp.getType() == Leader.PROPOSAL) {
- assertZxidEquals(zxids[index++], qp.getZxid());
- }
- }
- }
-
- void reset() {
- learnerHandler.getQueuedPackets().clear();
- learnerHandler.threadStarted = false;
- learnerHandler.setFirstPacket(true);
- }
-
- /**
- * Check if op packet (first packet in the queue) match the expected value
- * @param type - type of packet
- * @param zxid - zxid in the op packet
- * @param currentZxid - last packet queued by syncFollower,
- * before invoking startForwarding()
- */
- public void assertOpType(int type, long zxid, long currentZxid) {
- Queue<QuorumPacket> packets = learnerHandler.getQueuedPackets();
- assertTrue(packets.size() > 0);
- assertEquals(type, packets.peek().getType());
- assertZxidEquals(zxid, packets.peek().getZxid());
- assertZxidEquals(currentZxid, this.currentZxid);
- }
-
- void assertZxidEquals(long expected, long value) {
- assertEquals("Expected 0x" + Long.toHexString(expected) + " but was 0x"
- + Long.toHexString(value), expected, value);
- }
-
- /**
- * Test cases when leader has empty commitedLog
- */
- @Test
- public void testEmptyCommittedLog() throws Exception {
- long peerZxid;
-
- // Peer has newer zxid
- peerZxid = 3;
- db.lastProcessedZxid = 1;
- db.committedLog.clear();
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send TRUNC and forward any packet starting lastProcessedZxid
- assertOpType(Leader.TRUNC, db.lastProcessedZxid, db.lastProcessedZxid);
- reset();
-
- // Peer is already sync
- peerZxid = 1;
- db.lastProcessedZxid = 1;
- db.committedLog.clear();
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF and forward any packet starting lastProcessedZxid
- assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
- assertEquals(1, learnerHandler.getQueuedPackets().size());
- reset();
-
- // Peer has 0 zxid (new machine turn up), txnlog
- // is disabled
- peerZxid = 0;
- db.setSnapshotSizeFactor(-1);
- db.lastProcessedZxid = 1;
- db.committedLog.clear();
- // We send SNAP
- assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
- assertEquals(0, learnerHandler.getQueuedPackets().size());
- reset();
-
- }
-
- /**
- * Test cases when leader has committedLog
- */
- @Test
- public void testCommittedLog() throws Exception {
- long peerZxid;
-
- // Commit proposal may lag behind data tree, but it shouldn't affect
- // us in any case
- db.lastProcessedZxid = 6;
- db.committedLog.add(createProposal(2));
- db.committedLog.add(createProposal(3));
- db.committedLog.add(createProposal(5));
-
- // Peer has zxid that we have never seen
- peerZxid = 4;
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send TRUNC to 3 and forward any packet starting 5
- assertOpType(Leader.TRUNC, 3, 5);
- // DIFF + 1 proposals + 1 commit
- assertEquals(3, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { 5 });
- reset();
-
- // Peer is within committedLog range
- peerZxid = 2;
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF and forward any packet starting lastProcessedZxid
- assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
- db.getmaxCommittedLog());
- // DIFF + 2 proposals + 2 commit
- assertEquals(5, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { 3, 5 });
- reset();
-
- // Peer miss the committedLog and txnlog is disabled
- peerZxid = 1;
- db.setSnapshotSizeFactor(-1);
- // We send SNAP
- assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
- assertEquals(0, learnerHandler.getQueuedPackets().size());
- reset();
- }
-
- /**
- * Test cases when txnlog is enabled
- */
- @Test
- public void testTxnLog() throws Exception {
- long peerZxid;
- db.txnLog.add(createProposal(2));
- db.txnLog.add(createProposal(3));
- db.txnLog.add(createProposal(5));
- db.txnLog.add(createProposal(6));
- db.txnLog.add(createProposal(7));
- db.txnLog.add(createProposal(8));
- db.txnLog.add(createProposal(9));
-
- db.lastProcessedZxid = 9;
- db.committedLog.add(createProposal(6));
- db.committedLog.add(createProposal(7));
- db.committedLog.add(createProposal(8));
-
- // Peer has zxid that we have never seen
- peerZxid = 4;
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
- assertOpType(Leader.TRUNC, 3, db.getmaxCommittedLog());
- // DIFF + 4 proposals + 4 commit
- assertEquals(9, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { 5, 6, 7, 8 });
- reset();
-
- // Peer zxid is in txnlog range
- peerZxid = 3;
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF and forward any packet starting at maxCommittedLog
- assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
- db.getmaxCommittedLog());
- // DIFF + 4 proposals + 4 commit
- assertEquals(9, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { 5, 6, 7, 8 });
- reset();
-
- }
-
- /**
- * Test case verifying TxnLogProposalIterator closure.
- */
- @Test
- public void testTxnLogProposalIteratorClosure() throws Exception {
- long peerZxid;
-
- // CommmitedLog is empty, we will use txnlog up to lastProcessZxid
- db = new MockZKDatabase(null) {
- @Override
- public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid,
- long limit) {
- return TxnLogProposalIterator.EMPTY_ITERATOR;
- }
- };
- db.lastProcessedZxid = 7;
- db.txnLog.add(createProposal(2));
- db.txnLog.add(createProposal(3));
-
- // Peer zxid
- peerZxid = 4;
- assertTrue("Couldn't identify snapshot transfer!",
- learnerHandler.syncFollower(peerZxid, db, leader));
- reset();
- }
-
- /**
- * Test cases when txnlog is enabled and commitedLog is empty
- */
- @Test
- public void testTxnLogOnly() throws Exception {
- long peerZxid;
-
- // CommmitedLog is empty, we will use txnlog up to lastProcessZxid
- db.lastProcessedZxid = 7;
- db.txnLog.add(createProposal(2));
- db.txnLog.add(createProposal(3));
- db.txnLog.add(createProposal(5));
- db.txnLog.add(createProposal(6));
- db.txnLog.add(createProposal(7));
- db.txnLog.add(createProposal(8));
-
- // Peer has zxid that we have never seen
- peerZxid = 4;
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send TRUNC to 3 and forward any packet starting at
- // lastProcessedZxid
- assertOpType(Leader.TRUNC, 3, db.lastProcessedZxid);
- // DIFF + 3 proposals + 3 commit
- assertEquals(7, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { 5, 6, 7 });
- reset();
-
- // Peer has zxid in txnlog range
- peerZxid = 2;
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF and forward any packet starting at lastProcessedZxid
- assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
- // DIFF + 4 proposals + 4 commit
- assertEquals(9, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { 3, 5, 6, 7 });
- reset();
-
- // Peer miss the txnlog
- peerZxid = 1;
- assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send snap
- assertEquals(0, learnerHandler.getQueuedPackets().size());
- reset();
- }
-
- long getZxid(long epoch, long counter){
- return ZxidUtils.makeZxid(epoch, counter);
- }
-
- /**
- * Test cases with zxids that are negative long
- */
- @Test
- public void testTxnLogWithNegativeZxid() throws Exception {
- long peerZxid;
- db.txnLog.add(createProposal(getZxid(0xf, 2)));
- db.txnLog.add(createProposal(getZxid(0xf, 3)));
- db.txnLog.add(createProposal(getZxid(0xf, 5)));
- db.txnLog.add(createProposal(getZxid(0xf, 6)));
- db.txnLog.add(createProposal(getZxid(0xf, 7)));
- db.txnLog.add(createProposal(getZxid(0xf, 8)));
- db.txnLog.add(createProposal(getZxid(0xf, 9)));
-
- db.lastProcessedZxid = getZxid(0xf, 9);
- db.committedLog.add(createProposal(getZxid(0xf, 6)));
- db.committedLog.add(createProposal(getZxid(0xf, 7)));
- db.committedLog.add(createProposal(getZxid(0xf, 8)));
-
- // Peer has zxid that we have never seen
- peerZxid = getZxid(0xf, 4);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
- assertOpType(Leader.TRUNC, getZxid(0xf, 3), db.getmaxCommittedLog());
- // DIFF + 4 proposals + 4 commit
- assertEquals(9, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { getZxid(0xf, 5),
- getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8) });
- reset();
-
- // Peer zxid is in txnlog range
- peerZxid = getZxid(0xf, 3);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF and forward any packet starting at maxCommittedLog
- assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
- db.getmaxCommittedLog());
- // DIFF + 4 proposals + 4 commit
- assertEquals(9, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { getZxid(0xf, 5),
- getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8) });
- reset();
- }
-
- /**
- * Test cases when peer has new-epoch zxid
- */
- @Test
- public void testNewEpochZxid() throws Exception {
- long peerZxid;
- db.txnLog.add(createProposal(getZxid(0, 1)));
- db.txnLog.add(createProposal(getZxid(1, 1)));
- db.txnLog.add(createProposal(getZxid(1, 2)));
-
- // After leader election, lastProcessedZxid will point to new epoch
- db.lastProcessedZxid = getZxid(2, 0);
- db.committedLog.add(createProposal(getZxid(1, 1)));
- db.committedLog.add(createProposal(getZxid(1, 2)));
-
- // Peer has zxid of epoch 0
- peerZxid = getZxid(0, 0);
- // We should get snap, we can do better here, but the main logic is
- // that we should never send diff if we have never seen any txn older
- // than peer zxid
- assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
- assertEquals(0, learnerHandler.getQueuedPackets().size());
- reset();
-
- // Peer has zxid of epoch 1
- peerZxid = getZxid(1, 0);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
- assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
- // DIFF + 2 proposals + 2 commit
- assertEquals(5, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { getZxid(1, 1), getZxid(1, 2)});
- reset();
-
- // Peer has zxid of epoch 2, so it is already sync
- peerZxid = getZxid(2, 0);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF to (2, 0) and forward any packet starting at (2, 0)
- assertOpType(Leader.DIFF, getZxid(2, 0), getZxid(2, 0));
- // DIFF only
- assertEquals(1, learnerHandler.getQueuedPackets().size());
- reset();
-
- }
-
- /**
- * Test cases when learner has new-epcoh zxid
- * (zxid & 0xffffffffL) == 0;
- */
- @Test
- public void testNewEpochZxidWithTxnlogOnly() throws Exception {
- long peerZxid;
- db.txnLog.add(createProposal(getZxid(1, 1)));
- db.txnLog.add(createProposal(getZxid(2, 1)));
- db.txnLog.add(createProposal(getZxid(2, 2)));
- db.txnLog.add(createProposal(getZxid(4, 1)));
-
- // After leader election, lastProcessedZxid will point to new epoch
- db.lastProcessedZxid = getZxid(6, 0);
-
- // Peer has zxid of epoch 3
- peerZxid = getZxid(3, 0);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF to (6,0) and forward any packet starting at (4,1)
- assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
- // DIFF + 1 proposals + 1 commit
- assertEquals(3, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { getZxid(4, 1)});
- reset();
-
- // Peer has zxid of epoch 4
- peerZxid = getZxid(4, 0);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF to (6,0) and forward any packet starting at (4,1)
- assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
- // DIFF + 1 proposals + 1 commit
- assertEquals(3, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { getZxid(4, 1)});
- reset();
-
- // Peer has zxid of epoch 5
- peerZxid = getZxid(5, 0);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF to (6,0) and forward any packet starting at (5,0)
- assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(5, 0));
- // DIFF only
- assertEquals(1, learnerHandler.getQueuedPackets().size());
- reset();
-
- // Peer has zxid of epoch 6
- peerZxid = getZxid(6, 0);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF to (6,0) and forward any packet starting at (6, 0)
- assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(6, 0));
- // DIFF only
- assertEquals(1, learnerHandler.getQueuedPackets().size());
- reset();
- }
-
- /**
- * Test cases when there is a duplicate txn in the committedLog. This
- * should never happen unless there is a bug in initialization code
- * but the learner should never see duplicate packets
- */
- @Test
- public void testDuplicatedTxn() throws Exception {
- long peerZxid;
- db.txnLog.add(createProposal(getZxid(0, 1)));
- db.txnLog.add(createProposal(getZxid(1, 1)));
- db.txnLog.add(createProposal(getZxid(1, 2)));
- db.txnLog.add(createProposal(getZxid(1, 1)));
- db.txnLog.add(createProposal(getZxid(1, 2)));
-
- // After leader election, lastProcessedZxid will point to new epoch
- db.lastProcessedZxid = getZxid(2, 0);
- db.committedLog.add(createProposal(getZxid(1, 1)));
- db.committedLog.add(createProposal(getZxid(1, 2)));
- db.committedLog.add(createProposal(getZxid(1, 1)));
- db.committedLog.add(createProposal(getZxid(1, 2)));
-
- // Peer has zxid of epoch 1
- peerZxid = getZxid(1, 0);
- assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
- // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
- assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
- // DIFF + 2 proposals + 2 commit
- assertEquals(5, learnerHandler.getQueuedPackets().size());
- queuedPacketMatches(new long[] { getZxid(1, 1), getZxid(1, 2)});
- reset();
-
- }
-
- /**
- * Test cases when we have to TRUNC learner, but it may cross epoch boundary
- * so we need to send snap instead
- */
- @Test
- public void testCrossEpochTrunc() throws Exception {
- long peerZxid;
- db.txnLog.add(createProposal(getZxid(1, 1)));
- db.txnLog.add(createProposal(getZxid(2, 1)));
- db.txnLog.add(createProposal(getZxid(2, 2)));
- db.txnLog.add(createProposal(getZxid(4, 1)));
-
- // After leader election, lastProcessedZxid will point to new epoch
- db.lastProcessedZxid = getZxid(6, 0);
-
- // Peer has zxid (3, 1)
- peerZxid = getZxid(3, 1);
- assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
- assertEquals(0, learnerHandler.getQueuedPackets().size());
- reset();
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/faa7cec7/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
deleted file mode 100644
index c2d65e3..0000000
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LearnerSnapshotThrottlerTest extends ZKTestCase {
- private static final Logger LOG =
- LoggerFactory.getLogger(LearnerSnapshotThrottlerTest.class);
-
- @Test(expected = SnapshotThrottleException.class)
- public void testTooManySnapshotsNonessential() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
- for (int i = 0; i < 6; i++) {
- throttler.beginSnapshot(false);
- }
- }
-
- @Test(expected = SnapshotThrottleException.class)
- public void testTooManySnapshotsEssential() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
- try {
- for (int i = 0; i < 6; i++) {
- throttler.beginSnapshot(true);
- }
- }
- catch (SnapshotThrottleException ex) {
- Assert.fail("essential snapshots should not be throttled");
- }
- throttler.endSnapshot();
- throttler.beginSnapshot(false);
- }
-
- @Test
- public void testNoThrottle() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
- try {
- for (int i = 0; i < 6; i++) {
- throttler.beginSnapshot(true);
- }
- }
- catch (SnapshotThrottleException ex) {
- Assert.fail("essential snapshots should not be throttled");
- }
- throttler.endSnapshot();
- for (int i = 0; i < 5; i++) {
- throttler.endSnapshot();
- throttler.beginSnapshot(false);
- }
- }
-
- @Test
- public void testTryWithResourceNoThrottle() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
- for (int i = 0; i < 3; i++) {
- LearnerSnapshot snapshot = throttler.beginSnapshot(false);
- try {
- Assert.assertFalse(snapshot.isEssential());
- Assert.assertEquals(1, snapshot.getConcurrentSnapshotNumber());
- } finally {
- snapshot.close();
- }
- }
- }
-
- @Test(expected = SnapshotThrottleException.class)
- public void testTryWithResourceThrottle() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
- LearnerSnapshot outer = throttler.beginSnapshot(true);
- try {
- LearnerSnapshot inner = throttler.beginSnapshot(false);
- try {
- Assert.fail("shouldn't be able to have both snapshots open");
- } finally {
- inner.close();
- }
- } finally {
- outer.close();
- }
- }
-
- @Test
- public void testParallelNoThrottle() throws Exception {
- final int numThreads = 50;
-
- final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(numThreads);
- ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
- final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
- final CountDownLatch snapshotProgressLatch = new CountDownLatch(numThreads);
-
- List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
- for (int i = 0; i < numThreads; i++) {
- results.add(threadPool.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() {
- threadStartLatch.countDown();
- try {
- threadStartLatch.await();
-
- throttler.beginSnapshot(false);
-
- snapshotProgressLatch.countDown();
- snapshotProgressLatch.await();
-
- throttler.endSnapshot();
- }
- catch (Exception e) {
- return false;
- }
-
- return true;
- }
- }));
- }
-
- for (Future<Boolean> result : results) {
- Assert.assertTrue(result.get());
- }
- }
-
- @Test
- public void testPositiveTimeout() throws Exception {
- final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1, 200);
- ExecutorService threadPool = Executors.newFixedThreadPool(1);
-
- LearnerSnapshot first = throttler.beginSnapshot(false);
- final CountDownLatch snapshotProgressLatch = new CountDownLatch(1);
-
- Future<Boolean> result = threadPool.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() {
- try {
- snapshotProgressLatch.countDown();
- LearnerSnapshot second = throttler.beginSnapshot(false);
- second.close();
- }
- catch (Exception e) {
- return false;
- }
-
- return true;
- }
- });
-
- snapshotProgressLatch.await();
-
- first.close();
-
- Assert.assertTrue(result.get());
- }
-
- @Test
- public void testHighContentionWithTimeout() throws Exception {
- int numThreads = 20;
-
- final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(2, 5000);
- ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
- final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
-
- List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
- for (int i = 0; i < numThreads; i++) {
- results.add(threadPool.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() {
- threadStartLatch.countDown();
- try {
- threadStartLatch.await();
-
- LearnerSnapshot snap = throttler.beginSnapshot(false);
-
- int snapshotNumber = snap.getConcurrentSnapshotNumber();
-
- throttler.endSnapshot();
-
- return snapshotNumber <= 2;
- }
- catch (Exception e) {
- LOG.error("Exception trying to begin snapshot", e);
- return false;
- }
- }
- }));
- }
-
- for (Future<Boolean> result : results) {
- Assert.assertTrue(result.get());
- }
- }
-}
|