Author: breed
Date: Fri Jul 23 06:31:57 2010
New Revision: 966984
URL: http://svn.apache.org/viewvc?rev=966984&view=rev
Log:
ZOOKEEPER-790. Last processed zxid set prematurely while establishing leadership (fpj via
breed)
Modified:
hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Fri Jul 23 06:31:57 2010
@@ -58,6 +58,8 @@ BUGFIXES:
ZOOKEEPER-766. forrest recipes docs don't mention the lock/queue recipe
implementations available in the release (phunt via mahadev)
+ ZOOKEEPER-790. Last processed zxid set prematurely while establishing leadership (fpj via
breed)
+
Release 3.3.0 - 2010-03-24
Non-backward compatible changes:
Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
(original)
+++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
Fri Jul 23 06:31:57 2010
@@ -193,7 +193,12 @@ public class FileTxnLog implements TxnLo
+ hdr.getType());
}
if (logStream==null) {
- logFileWrite = new File(logDir, ("log." +
+ if(LOG.isInfoEnabled()){
+ LOG.info("Creating new log file: log." +
+ Long.toHexString(hdr.getZxid()));
+ }
+
+ logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
(original)
+++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
Fri Jul 23 06:31:57 2010
@@ -274,16 +274,15 @@ public class Leader {
try {
self.tick = 0;
zk.loadData();
- zk.startup();
+
long epoch = self.getLastLoggedZxid() >> 32L;
epoch++;
zk.setZxid(epoch << 32L);
- zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
synchronized(this){
lastProposed = zk.getZxid();
}
-
+
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
@@ -327,6 +326,13 @@ public class Leader {
Thread.sleep(self.tickTime);
self.tick++;
}
+
+ if(LOG.isInfoEnabled()){
+ LOG.info("Have quorum of supporters; starting up and setting last processed
zxid: " + zk.getZxid());
+ }
+ zk.startup();
+ zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
+
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.cnxnFactory.setZooKeeperServer(zk);
}
@@ -466,7 +472,7 @@ public class Leader {
LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
+ " is " + p.ackSet.size());
}
- if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
+ if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
+ " from " + followerAddr + " not first!");
@@ -603,6 +609,16 @@ public class Leader {
long lastProposed;
+
+ /**
+ * Returns the current epoch of the leader.
+ *
+ * @return
+ */
+ public long getEpoch(){
+ return lastProposed >> 32L;
+ }
+
/**
* create a proposal and send it out to all the members
*
Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
(original)
+++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Fri Jul 23 06:31:57 2010
@@ -311,6 +311,9 @@ public class Learner {
System.exit(13);
}
+ if(LOG.isInfoEnabled()){
+ LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >>
32L));
+ }
zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid);
}
ack.setZxid(newLeaderZxid & ~0xffffffffL);
Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java
(original)
+++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java
Fri Jul 23 06:31:57 2010
@@ -211,48 +211,72 @@ public class QuorumBase extends ClientBa
}
JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
}
- public void setupServers() throws IOException {
+
+ public void setupServers() throws IOException {
+ setupServer(1);
+ setupServer(2);
+ setupServer(3);
+ setupServer(4);
+ setupServer(5);
+ }
+
+ HashMap<Long,QuorumServer> peers = null;
+ public void setupServer(int i) throws IOException {
int tickTime = 2000;
int initLimit = 3;
int syncLimit = 3;
- HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+
+ if(peers == null){
+ peers = new HashMap<Long,QuorumServer>();
- peers.put(Long.valueOf(1), new QuorumServer(1,
+ peers.put(Long.valueOf(1), new QuorumServer(1,
new InetSocketAddress("127.0.0.1", port1 + 1000),
new InetSocketAddress("127.0.0.1", portLE1 + 1000),
LearnerType.PARTICIPANT));
- peers.put(Long.valueOf(2), new QuorumServer(2,
+ peers.put(Long.valueOf(2), new QuorumServer(2,
new InetSocketAddress("127.0.0.1", port2 + 1000),
new InetSocketAddress("127.0.0.1", portLE2 + 1000),
LearnerType.PARTICIPANT));
- peers.put(Long.valueOf(3), new QuorumServer(3,
+ peers.put(Long.valueOf(3), new QuorumServer(3,
new InetSocketAddress("127.0.0.1", port3 + 1000),
new InetSocketAddress("127.0.0.1", portLE3 + 1000),
LearnerType.PARTICIPANT));
- peers.put(Long.valueOf(4), new QuorumServer(4,
+ peers.put(Long.valueOf(4), new QuorumServer(4,
new InetSocketAddress("127.0.0.1", port4 + 1000),
new InetSocketAddress("127.0.0.1", portLE4 + 1000),
LearnerType.PARTICIPANT));
- peers.put(Long.valueOf(5), new QuorumServer(5,
+ peers.put(Long.valueOf(5), new QuorumServer(5,
new InetSocketAddress("127.0.0.1", port5 + 1000),
new InetSocketAddress("127.0.0.1", portLE5 + 1000),
LearnerType.PARTICIPANT));
+ }
- LOG.info("creating QuorumPeer 1 port " + port1);
- s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit);
- assertEquals(port1, s1.getClientPort());
- LOG.info("creating QuorumPeer 2 port " + port2);
- s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit);
- assertEquals(port2, s2.getClientPort());
- LOG.info("creating QuorumPeer 3 port " + port3);
- s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit);
- assertEquals(port3, s3.getClientPort());
- LOG.info("creating QuorumPeer 4 port " + port4);
- s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit);
- assertEquals(port4, s4.getClientPort());
- LOG.info("creating QuorumPeer 5 port " + port5);
- s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit);
- assertEquals(port5, s5.getClientPort());
+ switch(i){
+ case 1:
+ LOG.info("creating QuorumPeer 1 port " + port1);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit);
+ assertEquals(port1, s1.getClientPort());
+ break;
+ case 2:
+ LOG.info("creating QuorumPeer 2 port " + port2);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit);
+ assertEquals(port2, s2.getClientPort());
+ break;
+ case 3:
+ LOG.info("creating QuorumPeer 3 port " + port3);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit);
+ assertEquals(port3, s3.getClientPort());
+ break;
+ case 4:
+ LOG.info("creating QuorumPeer 4 port " + port4);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit);
+ assertEquals(port4, s4.getClientPort());
+ break;
+ case 5:
+ LOG.info("creating QuorumPeer 5 port " + port5);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit);
+ assertEquals(port5, s5.getClientPort());
+ }
}
@After
Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java
(original)
+++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Fri Jul 23 06:31:57 2010
@@ -19,6 +19,9 @@
package org.apache.zookeeper.test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
+
import org.apache.log4j.Logger;
import org.apache.zookeeper.AsyncCallback;
@@ -260,5 +263,89 @@ public class QuorumTest extends QuorumBa
}
zk.close();
}
+
+ /**
+ * Tests if closeSession can be logged before a leader gets established, which
+ * could lead to a locked-out follower (see ZOOKEEPER-790).
+ *
+ * The test works as follows. It has a client connecting to a follower f and
+ * sending batches of 1,000 updates. The goal is that f has a zxid higher than
+ * all other servers in the initial leader election. This way we can crash and
+ * recover the follower so that the follower believes it is the leader once it
+ * recovers (LE optimization: once a server receives a message from all other
+ * servers, it picks a leader.
+ *
+ * It also makes the session timeout very short so that we force the false
+ * leader to close the session and write it to the log in the buggy code (before
+ * ZOOKEEPER-790). Once f drops leadership and finds the current leader, its epoch
+ * is higher, and it rejects the leader. Now, if we prevent the leader from closing
+ * the session by only starting up (see Leader.lead()) once it obtains a quorum of
+ * supporters, then f will find the current leader and support it because it won't
+ * have a highe epoch.
+ *
+ */
+ @Test
+ public void testNoLogBeforeLeaderEstablishment ()
+ throws IOException, InterruptedException, KeeperException{
+ final Semaphore sem = new Semaphore(0);
+
+ Leader leader = qb.s1.leader;
+ if (leader == null) leader = qb.s2.leader;
+ if (leader == null) leader = qb.s3.leader;
+ if (leader == null) leader = qb.s4.leader;
+ if (leader == null) leader = qb.s5.leader;
+
+ assertNotNull(leader);
+
+ int serverPort = qb.s1.getClientPort();
+ if(qb.s1.leader != null){
+ serverPort = qb.s2.getClientPort();
+ }
+
+ ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + serverPort, 1000, new Watcher()
{
+ public void process(WatchedEvent event) {
+ }});
+
+ zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ for(int i = 0; i < 50000; i++) {
+ zk.setData("/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+ public void processResult(int rc, String path, Object ctx,
+ Stat stat) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ if(counter == 20000){
+ sem.release();
+ }
+ }
+ }, null);
+
+ if(i == 5000){
+ qb.shutdown(qb.s1);
+ LOG.info("Shutting down s1");
+ }
+ if(i == 12000){
+ qb.setupServer(1);
+ qb.s1.start();
+ LOG.info("Setting up s1");
+ }
+ if((i % 1000) == 0){
+ Thread.sleep(500);
+ }
+ }
+
+ // Wait until all updates return
+ sem.tryAcquire(15000, TimeUnit.MILLISECONDS);
+
+ // Verify that server is following and has the same epoch as the leader
+ assertTrue("Not following", qb.s1.follower != null);
+ long epochF = (qb.s1.getActiveServer().getZxid() >> 32L);
+ long epochL = (leader.getEpoch() >> 32L);
+ assertTrue("Zxid: " + qb.s1.getActiveServer().getZxid() +
+ "Current epoch: " + epochF, epochF == epochL);
+
+ }
// skip superhammer and clientcleanup as they are too expensive for quorum
}
|