Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C982A11BFB for ; Fri, 18 Jul 2014 18:49:20 +0000 (UTC) Received: (qmail 9475 invoked by uid 500); 18 Jul 2014 18:49:20 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 9448 invoked by uid 500); 18 Jul 2014 18:49:20 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 9437 invoked by uid 99); 18 Jul 2014 18:49:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Jul 2014 18:49:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Jul 2014 18:49:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6BAA7238890D; Fri, 18 Jul 2014 18:48:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1611765 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ Date: Fri, 18 Jul 2014 18:48:57 -0000 To: commits@zookeeper.apache.org From: fpj@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140718184857.6BAA7238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fpj Date: Fri Jul 18 18:48:56 2014 New Revision: 1611765 URL: http://svn.apache.org/r1611765 Log: ZOOKEEPER-1807. Observers spam each other creating connections to the election addr (Alex Shraer via fpj) Modified: zookeeper/trunk/CHANGES.txt zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java Modified: zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1611765&r1=1611764&r2=1611765&view=diff ============================================================================== --- zookeeper/trunk/CHANGES.txt (original) +++ zookeeper/trunk/CHANGES.txt Fri Jul 18 18:48:56 2014 @@ -708,6 +708,9 @@ BUGFIXES: ZOOKEEPER-1851. Follower and Observer Request Processors Do Not Forward create2 Requests (Chris Chen via rakeshr) + ZOOKEEPER-1807. Observers spam each other creating connections to the + election addr (Alex Shraer via fpj) + IMPROVEMENTS: ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports, Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1611765&r1=1611764&r2=1611765&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Jul 18 18:48:56 2014 @@ -315,7 +315,7 @@ public class FastLeaderElection implemen * If it is from a non-voting server (such as an observer or * a non-voting follower), respond right away. */ - if(!self.getVotingView().containsKey(response.sid)){ + if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) { Vote current = self.getCurrentVote(); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, @@ -658,7 +658,7 @@ public class FastLeaderElection implemen * Send notifications to all peers upon a change in our vote */ private void sendNotifications() { - for (long sid : self.getAllKnownServerIds()) { + for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, @@ -718,31 +718,36 @@ public class FastLeaderElection implemen } /** - * Termination predicate. Given a set of votes, determines if - * have sufficient to declare the end of the election round. - * - * @param votes Set of votes - * @param vote Identifier of the vote received last - */ - private boolean termPredicate( - HashMap votes, - Vote vote) { - - HashSet set = new HashSet(); + * Termination predicate. Given a set of votes, determines if have + * sufficient to declare the end of the election round. + * + * @param votes + * Set of votes + * @param vote + * Identifier of the vote received last + */ + private boolean termPredicate(HashMap votes, Vote vote) { + SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); + voteSet.addQuorumVerifier(self.getQuorumVerifier()); + if (self.getLastSeenQuorumVerifier() != null + && self.getLastSeenQuorumVerifier().getVersion() > self + .getQuorumVerifier().getVersion()) { + voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); + } /* - * First make the views consistent. Sometimes peers will have - * different zxids for a server depending on timing. + * First make the views consistent. Sometimes peers will have different + * zxids for a server depending on timing. */ - for (Map.Entry entry : votes.entrySet()) { - if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey()) - && vote.equals(entry.getValue())){ - set.add(entry.getKey()); + for (Map.Entry entry : votes.entrySet()) { + if (vote.equals(entry.getValue())) { + voteSet.addAck(entry.getKey()); } } - return self.getQuorumVerifier().containsQuorum(set); + return voteSet.hasAllQuorums(); } + /** * In the case there is a leader elected, and a quorum supporting * this leader, we have to check if the leader has voted and acked @@ -914,10 +919,10 @@ public class FastLeaderElection implemen notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); - } - else if(self.getVotingView().containsKey(n.sid)) { + } + else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) { /* - * Only proceed if the vote comes from a replica in the + * Only proceed if the vote comes from a replica in the current or next * voting view. */ switch (n.state) { Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1611765&r1=1611764&r2=1611765&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Jul 18 18:48:56 2014 @@ -1107,12 +1107,14 @@ public class QuorumPeer extends ZooKeepe return getQuorumVerifier().getObservingMembers(); } - public synchronized Set getAllKnownServerIds(){ - Set tmp = new HashSet(getQuorumVerifier().getAllMembers().keySet()); - if (getLastSeenQuorumVerifier()!=null) { - tmp.addAll(getLastSeenQuorumVerifier().getAllMembers().keySet()); - } - return tmp; + public synchronized Set getCurrentAndNextConfigVoters() { + Set voterIds = new HashSet(getQuorumVerifier() + .getVotingMembers().keySet()); + if (getLastSeenQuorumVerifier() != null) { + voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers() + .keySet()); + } + return voterIds; } /** Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1611765&r1=1611764&r2=1611765&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Fri Jul 18 18:48:56 2014 @@ -55,21 +55,6 @@ public class QuorumPeerTestBase extends } } - public static class MainThreadReconfigRecovery extends MainThread { - final File nextDynamicConfigFile; - - public MainThreadReconfigRecovery(int myid, int clientPort, - String currentQuorumCfgSection, String nextQuorumCfgSection) - throws IOException { - super(myid, clientPort, currentQuorumCfgSection); - nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next"); - FileWriter fwriter = new FileWriter(nextDynamicConfigFile); - fwriter.write(nextQuorumCfgSection + "\n"); - fwriter.flush(); - fwriter.close(); - } - } - public static class MainThread implements Runnable { final File confFile; final File dynamicConfigFile; @@ -141,7 +126,16 @@ public class QuorumPeerTestBase extends fwriter.flush(); fwriter.close(); } - + + public void writeTempDynamicConfigFile(String nextQuorumCfgSection) + throws IOException { + File nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next"); + FileWriter fwriter = new FileWriter(nextDynamicConfigFile); + fwriter.write(nextQuorumCfgSection + "\n"); + fwriter.flush(); + fwriter.close(); + } + Thread currentThread; synchronized public void start() { Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java?rev=1611765&r1=1611764&r2=1611765&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java Fri Jul 18 18:48:56 2014 @@ -21,6 +21,7 @@ package org.apache.zookeeper.server.quor import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; import java.util.ArrayList; +import java.util.HashSet; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.ZooKeeper; @@ -31,13 +32,13 @@ import org.junit.Test; public class ReconfigRecoveryTest extends QuorumPeerTestBase { /** - * Reconfiguration recovery - test that a reconfiguration is completed - * if leader has .next file during startup and new config is not running yet + * Reconfiguration recovery - test that a reconfiguration is completed if + * leader has .next file during startup and new config is not running yet */ @Test public void testNextConfigCompletion() throws Exception { ClientBase.setupTestEnv(); - + // 2 servers in current config, 3 in next config final int SERVER_COUNT = 3; final int clientPorts[] = new int[SERVER_COUNT]; @@ -46,96 +47,84 @@ public class ReconfigRecoveryTest extend ArrayList allServers = new ArrayList(); String currentQuorumCfgSection = null, nextQuorumCfgSection; - - for(int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + - ":participant;localhost:" + clientPorts[i]; - allServers.add(server); - sb.append(server +"\n"); - if (i == 1) currentQuorumCfgSection = sb.toString(); + + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=localhost:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + ":participant;localhost:" + + clientPorts[i]; + allServers.add(server); + sb.append(server + "\n"); + if (i == 1) + currentQuorumCfgSection = sb.toString() + "version=100000000\n"; } sb.append("version=200000000\n"); // version of current config is 100000000 nextQuorumCfgSection = sb.toString(); - + // Both servers 0 and 1 will have the .next config file, which means // for them that a reconfiguration was in progress when they failed // and the leader will complete it MainThread mt[] = new MainThread[SERVER_COUNT]; ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; for (int i = 0; i < SERVER_COUNT - 1; i++) { - mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection); + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection); + // note that we should run the server, shut it down and only then + // simulate a reconfig in progress by writing the temp file, but here no + // other server is competing with them in FLE, so we can skip this step + // (server 2 is booted after FLE ends) + mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection); mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], + ClientBase.CONNECTION_TIMEOUT, this); } - - Assert.assertTrue("waiting for server 0 being up", - ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], + + Assert.assertTrue("waiting for server 0 being up", ClientBase + .waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT)); - Assert.assertTrue("waiting for server 1 being up", - ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1], + Assert.assertTrue("waiting for server 1 being up", ClientBase + .waitForServerUp("127.0.0.1:" + clientPorts[1], CONNECTION_TIMEOUT)); - - int leader = mt[0].main.quorumPeer.leader == null ? 1: 0; - + + int leader = mt[0].main.quorumPeer.leader == null ? 1 : 0; + // the new server's config is going to include itself and the current leader sb = new StringBuilder(); sb.append(allServers.get(leader) + "\n"); sb.append(allServers.get(2) + "\n"); - + // suppose that this new server never heard about the reconfig proposal String newServerInitialConfig = sb.toString(); mt[2] = new MainThread(2, clientPorts[2], newServerInitialConfig); mt[2].start(); - zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this); + zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], + ClientBase.CONNECTION_TIMEOUT, this); - Assert.assertTrue("waiting for server 2 being up", - ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2], + Assert.assertTrue("waiting for server 2 being up", ClientBase + .waitForServerUp("127.0.0.1:" + clientPorts[2], CONNECTION_TIMEOUT)); - - ReconfigTest.testServerHasConfig(zk[0], allServers, null); - ReconfigTest.testServerHasConfig(zk[1], allServers, null); - ReconfigTest.testServerHasConfig(zk[2], allServers, null); - - ReconfigTest.testNormalOperation(zk[0], zk[2]); - ReconfigTest.testNormalOperation(zk[2], zk[1]); - - zk[2].close(); - mt[2].shutdown(); - - //now suppose that the new server heard the reconfig request - mt[2] = new MainThreadReconfigRecovery(2, clientPorts[2], newServerInitialConfig, nextQuorumCfgSection); - mt[2].start(); - zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this); - Assert.assertTrue("waiting for server 2 being up", - ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2], - CONNECTION_TIMEOUT)); - ReconfigTest.testServerHasConfig(zk[0], allServers, null); ReconfigTest.testServerHasConfig(zk[1], allServers, null); ReconfigTest.testServerHasConfig(zk[2], allServers, null); - + ReconfigTest.testNormalOperation(zk[0], zk[2]); ReconfigTest.testNormalOperation(zk[2], zk[1]); for (int i = 0; i < SERVER_COUNT; i++) { - zk[i].close(); - } - for (int i = 0; i < SERVER_COUNT; i++) { mt[i].shutdown(); + zk[i].close(); } } /** * Reconfiguration recovery - current config servers discover .next file, - * but they're both observers and their ports change in next config. Suppose that next config wasn't activated yet. - * Should complete reconfiguration. + * but they're both observers and their ports change in next config. Suppose + * that next config wasn't activated yet. Should complete reconfiguration. */ @Test public void testCurrentServersAreObserversInNextConfig() throws Exception { ClientBase.setupTestEnv(); - + // 2 servers in current config, 5 in next config final int SERVER_COUNT = 5; final int clientPorts[] = new int[SERVER_COUNT]; @@ -143,82 +132,118 @@ public class ReconfigRecoveryTest extend StringBuilder sb = new StringBuilder(); String server; - String currentQuorumCfgSection = null, nextQuorumCfgSection; - + String currentQuorumCfg = null, currentQuorumCfgSection = null, nextQuorumCfgSection = null; + ArrayList allServersCurrent = new ArrayList(); ArrayList allServersNext = new ArrayList(); - - - for(int i = 0; i < 2; i++) { - oldClientPorts[i] = PortAssignment.unique(); - server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + - ":participant;localhost:" + oldClientPorts[i]; - allServersCurrent.add(server); - sb.append(server +"\n"); + + for (int i = 0; i < 2; i++) { + oldClientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=localhost:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + ":participant;localhost:" + + oldClientPorts[i]; + allServersCurrent.add(server); + sb.append(server + "\n"); } - + + currentQuorumCfg = sb.toString(); + sb.append("version=100000000\n"); currentQuorumCfgSection = sb.toString(); + sb = new StringBuilder(); String role; - for (int i=0; i allServers = new ArrayList(); - for(int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + - ":participant;localhost:" + clientPorts[i]; - allServers.add(server); - sb.append(server +"\n"); - if (i == 1) currentQuorumCfgSection = sb.toString(); + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=localhost:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + ":participant;localhost:" + + clientPorts[i]; + allServers.add(server); + sb.append(server + "\n"); + if (i == 1) + currentQuorumCfgSection = sb.toString() + "version=100000000\n"; } - sb.append("version=200000000\n"); // version of current config is 100000000 + sb.append("version=200000000\n"); // version of current config is 100000000 nextQuorumCfgSection = sb.toString(); - - // lets start servers 2, 3, 4 with the new config + MainThread mt[] = new MainThread[SERVER_COUNT]; ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; // Both servers 0 and 1 will have the .next config file, which means // for them that a reconfiguration was in progress when they failed - // and the leader will complete it. for (int i = 0; i < 2; i++) { - mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection); + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection); + // note that we should run the server, shut it down and only then + // simulate a reconfig in progress by writing the temp file, but here no + // other server is competing with them in FLE, so we can skip this step + mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection); mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], + ClientBase.CONNECTION_TIMEOUT, this); } - Thread.sleep(CONNECTION_TIMEOUT*2); - - // make sure servers 0, 1 don't come online + Thread.sleep(CONNECTION_TIMEOUT * 2); + + // make sure servers 0, 1 don't come online - this should be the case + // since they can't complete the reconfig for (int i = 0; i < 2; i++) { - Assert.assertFalse("server " + i + " is up but shouldn't be", + Assert.assertFalse("server " + i + " is up but shouldn't be", ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT / 10)); } - + for (int i = 0; i < 2; i++) { zk[i].close(); - } - for (int i = 0; i < 2; i++) { mt[i].shutdown(); } } - + /** - * Reconfiguration recovery - test that old config members will join the new config - * if its already active, and not try to complete the reconfiguration + * Reconfiguration recovery - test that old config members will join the new + * config if its already active, and not try to complete the reconfiguration */ @Test public void testNextConfigAlreadyActive() throws Exception { ClientBase.setupTestEnv(); - + // 2 servers in current config, 5 in next config final int SERVER_COUNT = 5; final int clientPorts[] = new int[SERVER_COUNT]; @@ -284,75 +313,274 @@ public class ReconfigRecoveryTest extend String server; String currentQuorumCfgSection = null, nextQuorumCfgSection; - + ArrayList allServers = new ArrayList(); - for(int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + - ":participant;localhost:" + clientPorts[i]; - allServers.add(server); - sb.append(server +"\n"); - if (i == 1) currentQuorumCfgSection = sb.toString(); + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=localhost:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + ":participant;localhost:" + + clientPorts[i]; + allServers.add(server); + sb.append(server + "\n"); + if (i == 1) currentQuorumCfgSection = sb.toString() + "version=100000000\n"; } sb.append("version=200000000\n"); // version of current config is 100000000 nextQuorumCfgSection = sb.toString(); - + // lets start servers 2, 3, 4 with the new config MainThread mt[] = new MainThread[SERVER_COUNT]; ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; for (int i = 2; i < SERVER_COUNT; i++) { mt[i] = new MainThread(i, clientPorts[i], nextQuorumCfgSection); mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], + ClientBase.CONNECTION_TIMEOUT, this); } - for (int i = 2; i < SERVER_COUNT; i++) { + for (int i = 2; i < SERVER_COUNT; i++) { Assert.assertTrue("waiting for server " + i + " being up", - ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], - CONNECTION_TIMEOUT)); + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], + CONNECTION_TIMEOUT)); } ReconfigTest.testNormalOperation(zk[2], zk[3]); long epoch = mt[2].main.quorumPeer.getAcceptedEpoch(); - + // Both servers 0 and 1 will have the .next config file, which means // for them that a reconfiguration was in progress when they failed - // and the leader will complete it. + // and the leader will complete it. for (int i = 0; i < 2; i++) { - mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection); + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection); + mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection); mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], + ClientBase.CONNECTION_TIMEOUT, this); } - - // servers 0 and 1 should connect to all servers, including the one in their - // .next file during startup, and will find the next config and join it + // servers 0 and 1 should connect to all servers, including the one in + // their .next file during startup, and will find the next config and join it for (int i = 0; i < 2; i++) { - Assert.assertTrue("waiting for server " + i + " being up", + Assert.assertTrue("waiting for server " + i + " being up", ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], - CONNECTION_TIMEOUT*2)); + CONNECTION_TIMEOUT * 2)); } - + // make sure they joined the new config without any change to it Assert.assertEquals(epoch, mt[0].main.quorumPeer.getAcceptedEpoch()); Assert.assertEquals(epoch, mt[1].main.quorumPeer.getAcceptedEpoch()); Assert.assertEquals(epoch, mt[2].main.quorumPeer.getAcceptedEpoch()); - + ReconfigTest.testServerHasConfig(zk[0], allServers, null); ReconfigTest.testServerHasConfig(zk[1], allServers, null); - + ReconfigTest.testNormalOperation(zk[0], zk[2]); ReconfigTest.testNormalOperation(zk[4], zk[1]); - for (int i = 0; i < SERVER_COUNT; i++) { zk[i].close(); + mt[i].shutdown(); } + } + + /** + * Tests conversion of observer to participant AFTER new config was already + * committed. Old config: servers 0 (participant), 1 (participant), 2 + * (observer) New config: servers 2 (participant), 3 (participant) We start + * server 2 with old config and start server 3 with new config. All other + * servers are down. In order to terminate FLE, server 3 must 'convince' + * server 2 to adopt the new config and turn into a participant. + */ + @Test + public void testObserverConvertedToParticipantDuringFLE() throws Exception { + ClientBase.setupTestEnv(); + + final int SERVER_COUNT = 4; + int[][] ports = generatePorts(SERVER_COUNT); + String currentQuorumCfgSection, nextQuorumCfgSection; + + // generate old config string + HashSet observers = new HashSet(); + observers.add(2); + StringBuilder sb = generateConfig(3, ports, observers); + sb.append("version=100000000"); + currentQuorumCfgSection = sb.toString(); + + // generate new config string + ArrayList allServersNext = new ArrayList(); + sb = new StringBuilder(); + for (int i = 2; i < SERVER_COUNT; i++) { + String server = "server." + i + "=localhost:" + ports[i][0] + ":" + + ports[i][1] + ":participant;localhost:" + ports[i][2]; + allServersNext.add(server); + sb.append(server + "\n"); + } + sb.append("version=200000000"); // version of current config is 100000000 + nextQuorumCfgSection = sb.toString(); + + MainThread mt[] = new MainThread[SERVER_COUNT]; + ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; + + // start server 2 with old config, where it is an observer + mt[2] = new MainThread(2, ports[2][2], currentQuorumCfgSection); + mt[2].start(); + zk[2] = new ZooKeeper("127.0.0.1:" + ports[2][2], + ClientBase.CONNECTION_TIMEOUT, this); + + // start server 3 with new config + mt[3] = new MainThread(3, ports[3][2], nextQuorumCfgSection); + mt[3].start(); + zk[3] = new ZooKeeper("127.0.0.1:" + ports[3][2], + ClientBase.CONNECTION_TIMEOUT, this); + + for (int i = 2; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2], + CONNECTION_TIMEOUT * 2)); + ReconfigTest.testServerHasConfig(zk[i], allServersNext, null); + } + + Assert.assertEquals(nextQuorumCfgSection, + ReconfigTest.testServerHasConfig(zk[2], null, null)); + Assert.assertEquals(nextQuorumCfgSection, + ReconfigTest.testServerHasConfig(zk[3], null, null)); + ReconfigTest.testNormalOperation(zk[2], zk[2]); + ReconfigTest.testNormalOperation(zk[3], zk[2]); + + for (int i = 2; i < SERVER_COUNT; i++) { + zk[i].close(); + mt[i].shutdown(); + } + } + + /** + * Tests conversion of observer to participant during reconfig recovery, new + * config was not committed yet. Old config: servers 0 (participant), 1 + * (participant), 2 (observer) New config: servers 2 (participant), 3 + * (participant) We start server servers 0, 1, 2 with old config and a .next + * file indicating a reconfig in progress. We start server 3 with old config + * + itself in config file. In this scenario server 2 can't be converted to + * participant during reconfig since we don't gossip about proposed + * configurations, only about committed ones. This tests that new config can + * be completed, which requires server 2's ack for the newleader message, + * even though its an observer. + */ + @Test + public void testCurrentObserverIsParticipantInNewConfig() throws Exception { + ClientBase.setupTestEnv(); + + final int SERVER_COUNT = 4; + int[][] ports = generatePorts(SERVER_COUNT); + String currentQuorumCfg, currentQuorumCfgSection, nextQuorumCfgSection; + + // generate old config string + HashSet observers = new HashSet(); + observers.add(2); + + StringBuilder sb = generateConfig(3, ports, observers); + currentQuorumCfg = sb.toString(); + sb.append("version=100000000"); + currentQuorumCfgSection = sb.toString(); + + // Run servers 0..2 for a while + MainThread mt[] = new MainThread[SERVER_COUNT]; + ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; + for (int i = 0; i <= 2; i++) { + mt[i] = new MainThread(i, ports[i][2], currentQuorumCfgSection); + mt[i].start(); + zk[i] = new ZooKeeper("127.0.0.1:" + ports[i][2], + ClientBase.CONNECTION_TIMEOUT, this); + } + + ReconfigTest.testNormalOperation(zk[0], zk[2]); + + for (int i = 0; i <= 2; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2], + CONNECTION_TIMEOUT * 2)); + } + + // shut servers 0..2 down + for (int i = 0; i <= 2; i++) { + mt[i].shutdown(); + zk[i].close(); + } + + // generate new config string + ArrayList allServersNext = new ArrayList(); + sb = new StringBuilder(); + for (int i = 2; i < SERVER_COUNT; i++) { + String server = "server." + i + "=localhost:" + ports[i][0] + ":" + + ports[i][1] + ":participant;localhost:" + ports[i][2]; + allServersNext.add(server); + sb.append(server + "\n"); + } + sb.append("version=200000000"); // version of current config is 100000000 + nextQuorumCfgSection = sb.toString(); + + // simulate reconfig in progress - servers 0..2 have a temp reconfig + // file when they boot + for (int i = 0; i <= 2; i++) { + mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection); + mt[i].start(); + zk[i] = new ZooKeeper("127.0.0.1:" + ports[i][2], + ClientBase.CONNECTION_TIMEOUT, this); + } + // new server 3 has still its invalid joiner config - everyone in old + // config + itself + mt[3] = new MainThread(3, ports[3][2], currentQuorumCfg + + allServersNext.get(1)); + mt[3].start(); + zk[3] = new ZooKeeper("127.0.0.1:" + ports[3][2], + ClientBase.CONNECTION_TIMEOUT, this); + + for (int i = 2; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2], + CONNECTION_TIMEOUT * 2)); + ReconfigTest.testServerHasConfig(zk[i], allServersNext, null); + } + + ReconfigTest.testNormalOperation(zk[0], zk[2]); + ReconfigTest.testNormalOperation(zk[3], zk[1]); + Assert.assertEquals(nextQuorumCfgSection, + ReconfigTest.testServerHasConfig(zk[2], null, null)); + Assert.assertEquals(nextQuorumCfgSection, + ReconfigTest.testServerHasConfig(zk[3], null, null)); + for (int i = 0; i < SERVER_COUNT; i++) { + zk[i].close(); mt[i].shutdown(); } } - - - -} + + /* + * Generates 3 ports per server + */ + private int[][] generatePorts(int numServers) { + int[][] ports = new int[numServers][]; + for (int i = 0; i < numServers; i++) { + ports[i] = new int[3]; + for (int j = 0; j < 3; j++) { + ports[i][j] = PortAssignment.unique(); + } + } + return ports; + } + + /* + * Creates a configuration string for servers 0..numServers-1 Ids in + * observerIds correspond to observers, other ids are for participants. + */ + private StringBuilder generateConfig(int numServers, int[][] ports, + HashSet observerIds) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < numServers; i++) { + String server = "server." + i + "=localhost:" + ports[i][0] + ":" + + ports[i][1] + ":" + + (observerIds.contains(i) ? "observer" : "participant") + + ";localhost:" + ports[i][2]; + sb.append(server + "\n"); + } + return sb; + } +} \ No newline at end of file