Author: fpj
Date: Tue Jan 15 21:13:28 2013
New Revision: 1433651
URL: http://svn.apache.org/viewvc?rev=1433651&view=rev
Log:
ZOOKEEPER-1324. Remove Duplicate NEWLEADER packets from the Leader to the Follower. (thawan
via fpj)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1433651&r1=1433650&r2=1433651&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jan 15 21:13:28 2013
@@ -461,6 +461,8 @@ IMPROVEMENTS:
ZOOKEEPER-1535. ZK Shell/Cli re-executes last command on exit (Edward Ribeiro via camille)
+ ZOOKEEPER-1324. Remove Duplicate NEWLEADER packets from the Leader to the Follower. (thawan
via fpj)
+
Release 3.4.0 -
Non-backward compatible changes:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1433651&r1=1433650&r2=1433651&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Tue Jan 15
21:13:28 2013
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -75,6 +76,8 @@ public class Leader {
final QuorumPeer self;
+ private boolean quorumFormed = false;
+
// the follower acceptor thread
volatile LearnerCnxAcceptor cnxAcceptor = null;
@@ -376,8 +379,6 @@ public class Leader {
LOG.info("NEWLEADER proposal has Zxid of "
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
}
- outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
- newLeaderProposal.ackSet.add(self.getId());
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
@@ -385,35 +386,24 @@ public class Leader {
// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged
- while (!self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
- //while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
- if (self.tick > self.initLimit) {
- // Followers aren't syncing fast enough,
- // renounce leadership!
- StringBuilder ackToString = new StringBuilder();
- for(Long id : newLeaderProposal.ackSet)
- ackToString.append(id + ": ");
-
- shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
- HashSet<Long> followerSet = new HashSet<Long>();
-
- for(LearnerHandler f : getLearners()) {
- if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
- followerSet.add(f.getSid());
- }
- }
-
- if (self.getQuorumVerifier().containsQuorum(followerSet)) {
- //if (followers.size() >= self.quorumPeers.size() / 2) {
- LOG.warn("Enough followers present. "+
- "Perhaps the initTicks need to be increased.");
- }
- return;
+ try {
+ waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
+ } catch (InterruptedException e) {
+ shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ + getSidSetString(newLeaderProposal.ackSet) + " ]");
+ HashSet<Long> followerSet = new HashSet<Long>();
+ for (LearnerHandler f : learners)
+ followerSet.add(f.getSid());
+
+ if (self.getQuorumVerifier().containsQuorum(followerSet)) {
+ LOG.warn("Enough followers present. "
+ + "Perhaps the initTicks need to be increased.");
}
- Thread.sleep(self.tickTime);
- self.tick++;
+ return;
}
+ startZkServer();
+
/**
* WARNING: do not use this for anything other than QA testing
* on a real cluster. Specifically to enable verification that quorum
@@ -468,9 +458,8 @@ public class Leader {
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet))
{
//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2)
{
// Lost quorum, shutdown
- // TODO: message is wrong unless majority quorums used
- shutdown("Only " + syncedSet.size() + " followers, need "
- + (self.getVotingView().size() / 2));
+ shutdown("Not sufficient followers synced, only synced with sids: [ "
+ + getSidSetString(syncedSet) + " ]");
// make sure the order is the same!
// the leader goes to looking
return;
@@ -544,6 +533,15 @@ public class Leader {
LOG.trace("outstanding proposals all");
}
+ if ((zxid & 0xffffffffL) == 0) {
+ /*
+ * We no longer process NEWLEADER ack by this method. However,
+ * the learner sends ack back to the leader after it gets UPTODATE
+ * so we just ignore the message.
+ */
+ return;
+ }
+
if (outstandingProposals.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
@@ -580,26 +578,17 @@ public class Leader {
if (p.request != null) {
toBeApplied.add(p);
}
- // We don't commit the new leader proposal
- if ((zxid & 0xffffffffL) != 0) {
- if (p.request == null) {
- LOG.warn("Going to commmit null request for proposal: {}", p);
- }
- commit(zxid);
- inform(p);
- zk.commitProcessor.commit(p.request);
- if(pendingSyncs.containsKey(zxid)){
- for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
- sendSync(r);
- }
+
+ if (p.request == null) {
+ LOG.warn("Going to commmit null request for proposal: {}", p);
+ }
+ commit(zxid);
+ inform(p);
+ zk.commitProcessor.commit(p.request);
+ if(pendingSyncs.containsKey(zxid)){
+ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
+ sendSync(r);
}
- return;
- } else {
- lastCommitted = zxid;
- LOG.info("Have quorum of supporters; starting up and setting last processed
zxid: 0x{}",
- Long.toHexString(zk.getZxid()));
- zk.startup();
- zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}
}
}
@@ -923,6 +912,86 @@ public class Leader {
}
}
}
+
+ /**
+ * Return a list of sid in set as string
+ */
+ private String getSidSetString(Set<Long> sidSet) {
+ StringBuilder sids = new StringBuilder();
+ Iterator<Long> iter = sidSet.iterator();
+ while (iter.hasNext()) {
+ sids.append(iter.next());
+ if (!iter.hasNext()) {
+ break;
+ }
+ sids.append(",");
+ }
+ return sids.toString();
+ }
+
+ /**
+ * Start up Leader ZooKeeper server and initialize zxid to the new epoch
+ */
+ private synchronized void startZkServer() {
+ // Update lastCommitted and Db's zxid to a value representing the new epoch
+ lastCommitted = zk.getZxid();
+ LOG.info("Have quorum of supporters, sids: [ "
+ + getSidSetString(newLeaderProposal.ackSet)
+ + " ]; starting up and setting last processed zxid: 0x{}",
+ Long.toHexString(zk.getZxid()));
+ zk.startup();
+ zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
+ }
+
+ /**
+ * Process NEWLEADER ack of a given sid and wait until the leader receives
+ * sufficient acks.
+ *
+ * @param sid
+ * @param learnerType
+ * @throws InterruptedException
+ */
+ public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
+ throws InterruptedException {
+
+ synchronized (newLeaderProposal.ackSet) {
+
+ if (quorumFormed) {
+ return;
+ }
+
+ long currentZxid = newLeaderProposal.packet.getZxid();
+ if (zxid != currentZxid) {
+ LOG.error("NEWLEADER ACK from sid: " + sid
+ + " is from a different epoch - current 0x"
+ + Long.toHexString(currentZxid) + " receieved 0x"
+ + Long.toHexString(zxid));
+ return;
+ }
+
+ if (learnerType == LearnerType.PARTICIPANT) {
+ newLeaderProposal.ackSet.add(sid);
+ }
+
+ if (self.getQuorumVerifier().containsQuorum(
+ newLeaderProposal.ackSet)) {
+ quorumFormed = true;
+ newLeaderProposal.ackSet.notifyAll();
+ } else {
+ long start = System.currentTimeMillis();
+ long cur = start;
+ long end = start + self.getInitLimit() * self.getTickTime();
+ while (!quorumFormed && cur < end) {
+ newLeaderProposal.ackSet.wait(end - cur);
+ cur = System.currentTimeMillis();
+ }
+ if (!quorumFormed) {
+ throw new InterruptedException(
+ "Timeout while waiting for NEWLEADER to be acked by quorum");
+ }
+ }
+ }
+ }
/**
* Get string representation of a given packet type
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1433651&r1=1433650&r2=1433651&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Tue
Jan 15 21:13:28 2013
@@ -456,7 +456,7 @@ public class LearnerHandler extends Thre
LOG.error("Next packet was supposed to be an ACK");
return;
}
- leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
+ leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1433651&r1=1433650&r2=1433651&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Tue Jan
15 21:13:28 2013
@@ -828,15 +828,6 @@ public class Zab1_0Test {
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
- Assert.assertEquals(Leader.NEWLEADER, qp.getType());
- Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
- Assert.assertEquals(1, l.self.getAcceptedEpoch());
- Assert.assertEquals(1, l.self.getCurrentEpoch());
-
- qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
- oa.writeRecord(qp, null);
-
- readPacketSkippingPing(ia, qp);
Assert.assertEquals(Leader.UPTODATE, qp.getType());
}
});
@@ -881,13 +872,6 @@ public class Zab1_0Test {
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
- Assert.assertEquals(Leader.NEWLEADER, qp.getType());
- Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
-
- qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
- oa.writeRecord(qp, null);
-
- readPacketSkippingPing(ia, qp);
Assert.assertEquals(Leader.UPTODATE, qp.getType());
}
});
|