zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r1433651 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/Leader.java src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Date Tue, 15 Jan 2013 21:13:28 GMT
Author: fpj
Date: Tue Jan 15 21:13:28 2013
New Revision: 1433651

URL: http://svn.apache.org/viewvc?rev=1433651&view=rev
Log:
ZOOKEEPER-1324. Remove Duplicate NEWLEADER packets from the Leader to the Follower. (thawan
via fpj)


Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1433651&r1=1433650&r2=1433651&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jan 15 21:13:28 2013
@@ -461,6 +461,8 @@ IMPROVEMENTS:
 
   ZOOKEEPER-1535. ZK Shell/Cli re-executes last command on exit (Edward Ribeiro via camille)
 
+  ZOOKEEPER-1324. Remove Duplicate NEWLEADER packets from the Leader to the Follower. (thawan
via fpj)
+
 Release 3.4.0 - 
 
 Non-backward compatible changes:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1433651&r1=1433650&r2=1433651&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Tue Jan 15
21:13:28 2013
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -75,6 +76,8 @@ public class Leader {
 
     final QuorumPeer self;
 
+    private boolean quorumFormed = false;
+
     // the follower acceptor thread
     volatile LearnerCnxAcceptor cnxAcceptor = null;
 
@@ -376,8 +379,6 @@ public class Leader {
                 LOG.info("NEWLEADER proposal has Zxid of "
                         + Long.toHexString(newLeaderProposal.packet.getZxid()));
             }
-            outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
-            newLeaderProposal.ackSet.add(self.getId());
 
             waitForEpochAck(self.getId(), leaderStateSummary);
             self.setCurrentEpoch(epoch);
@@ -385,35 +386,24 @@ public class Leader {
             // We have to get at least a majority of servers in sync with
             // us. We do this by waiting for the NEWLEADER packet to get
             // acknowledged
-            while (!self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
-            //while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
-                if (self.tick > self.initLimit) {
-                    // Followers aren't syncing fast enough,
-                    // renounce leadership!
-                    StringBuilder ackToString = new StringBuilder();
-                    for(Long id : newLeaderProposal.ackSet)
-                        ackToString.append(id + ": ");
-
-                    shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
-                    HashSet<Long> followerSet = new HashSet<Long>();
-
-                    for(LearnerHandler f : getLearners()) {
-                        if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
-                            followerSet.add(f.getSid());
-                        }
-                    }
-
-                    if (self.getQuorumVerifier().containsQuorum(followerSet)) {
-                    //if (followers.size() >= self.quorumPeers.size() / 2) {
-                        LOG.warn("Enough followers present. "+
-                                "Perhaps the initTicks need to be increased.");
-                    }
-                    return;
+            try {
+                waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
+            } catch (InterruptedException e) {
+                shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+                        + getSidSetString(newLeaderProposal.ackSet) + " ]");
+                HashSet<Long> followerSet = new HashSet<Long>();
+                for (LearnerHandler f : learners)
+                    followerSet.add(f.getSid());
+
+                if (self.getQuorumVerifier().containsQuorum(followerSet)) {
+                    LOG.warn("Enough followers present. "
+                            + "Perhaps the initTicks need to be increased.");
                 }
-                Thread.sleep(self.tickTime);
-                self.tick++;
+                return;
             }
 
+            startZkServer();
+
             /**
              * WARNING: do not use this for anything other than QA testing
              * on a real cluster. Specifically to enable verification that quorum
@@ -468,9 +458,8 @@ public class Leader {
               if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet))
{
                 //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2)
{
                     // Lost quorum, shutdown
-                  // TODO: message is wrong unless majority quorums used
-                    shutdown("Only " + syncedSet.size() + " followers, need "
-                            + (self.getVotingView().size() / 2));
+                    shutdown("Not sufficient followers synced, only synced with sids: [ "
+                            + getSidSetString(syncedSet) + " ]");
                     // make sure the order is the same!
                     // the leader goes to looking
                     return;
@@ -544,6 +533,15 @@ public class Leader {
             LOG.trace("outstanding proposals all");
         }
 
+        if ((zxid & 0xffffffffL) == 0) {
+            /*
+             * We no longer process NEWLEADER ack by this method. However,
+             * the learner sends ack back to the leader after it gets UPTODATE
+             * so we just ignore the message.
+             */
+            return;
+        }
+
         if (outstandingProposals.size() == 0) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("outstanding is 0");
@@ -580,26 +578,17 @@ public class Leader {
             if (p.request != null) {
                 toBeApplied.add(p);
             }
-            // We don't commit the new leader proposal
-            if ((zxid & 0xffffffffL) != 0) {
-                if (p.request == null) {
-                    LOG.warn("Going to commmit null request for proposal: {}", p);
-                }
-                commit(zxid);
-                inform(p);
-                zk.commitProcessor.commit(p.request);
-                if(pendingSyncs.containsKey(zxid)){
-                    for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
-                        sendSync(r);
-                    }
+
+            if (p.request == null) {
+                LOG.warn("Going to commmit null request for proposal: {}", p);
+            }
+            commit(zxid);
+            inform(p);
+            zk.commitProcessor.commit(p.request);
+            if(pendingSyncs.containsKey(zxid)){
+                for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
+                    sendSync(r);
                 }
-                return;
-            } else {
-                lastCommitted = zxid;
-                LOG.info("Have quorum of supporters; starting up and setting last processed
zxid: 0x{}",
-                        Long.toHexString(zk.getZxid()));
-                zk.startup();
-                zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
             }
         }
     }
@@ -923,6 +912,86 @@ public class Leader {
             }
         }
     }
+    
+    /**
+     * Return a list of sid in set as string  
+     */
+    private String getSidSetString(Set<Long> sidSet) {
+        StringBuilder sids = new StringBuilder();
+        Iterator<Long> iter = sidSet.iterator();
+        while (iter.hasNext()) {
+            sids.append(iter.next());
+            if (!iter.hasNext()) {
+              break;
+            }
+            sids.append(",");
+        }
+        return sids.toString();
+    }
+
+    /**
+     * Start up Leader ZooKeeper server and initialize zxid to the new epoch
+     */
+    private synchronized void startZkServer() {
+        // Update lastCommitted and Db's zxid to a value representing the new epoch
+        lastCommitted = zk.getZxid();
+        LOG.info("Have quorum of supporters, sids: [ "
+                + getSidSetString(newLeaderProposal.ackSet)
+                + " ]; starting up and setting last processed zxid: 0x{}",
+                Long.toHexString(zk.getZxid()));
+        zk.startup();
+        zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
+    }
+
+    /**
+     * Process NEWLEADER ack of a given sid and wait until the leader receives
+     * sufficient acks.
+     *
+     * @param sid
+     * @param learnerType
+     * @throws InterruptedException
+     */
+    public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
+            throws InterruptedException {
+
+        synchronized (newLeaderProposal.ackSet) {
+
+            if (quorumFormed) {
+                return;
+            }
+
+            long currentZxid = newLeaderProposal.packet.getZxid();
+            if (zxid != currentZxid) {
+                LOG.error("NEWLEADER ACK from sid: " + sid
+                        + " is from a different epoch - current 0x"
+                        + Long.toHexString(currentZxid) + " receieved 0x"
+                        + Long.toHexString(zxid));
+                return;
+            }
+
+            if (learnerType == LearnerType.PARTICIPANT) {
+                newLeaderProposal.ackSet.add(sid);
+            }
+
+            if (self.getQuorumVerifier().containsQuorum(
+                    newLeaderProposal.ackSet)) {
+                quorumFormed = true;
+                newLeaderProposal.ackSet.notifyAll();
+            } else {
+                long start = System.currentTimeMillis();
+                long cur = start;
+                long end = start + self.getInitLimit() * self.getTickTime();
+                while (!quorumFormed && cur < end) {
+                    newLeaderProposal.ackSet.wait(end - cur);
+                    cur = System.currentTimeMillis();
+                }
+                if (!quorumFormed) {
+                    throw new InterruptedException(
+                            "Timeout while waiting for NEWLEADER to be acked by quorum");
+                }
+            }
+        }
+    }
 
     /**
      * Get string representation of a given packet type

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1433651&r1=1433650&r2=1433651&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Tue
Jan 15 21:13:28 2013
@@ -456,7 +456,7 @@ public class LearnerHandler extends Thre
                 LOG.error("Next packet was supposed to be an ACK");
                 return;
             }
-            leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
+            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
             
             // now that the ack has been processed expect the syncLimit
             sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1433651&r1=1433650&r2=1433651&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Tue Jan
15 21:13:28 2013
@@ -828,15 +828,6 @@ public class Zab1_0Test {
                 oa.writeRecord(qp, null);
 
                 readPacketSkippingPing(ia, qp);
-                Assert.assertEquals(Leader.NEWLEADER, qp.getType());
-                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-                Assert.assertEquals(1, l.self.getAcceptedEpoch());
-                Assert.assertEquals(1, l.self.getCurrentEpoch());
-                
-                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
-                oa.writeRecord(qp, null);
-
-                readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.UPTODATE, qp.getType());
             }
         });
@@ -881,13 +872,6 @@ public class Zab1_0Test {
                 oa.writeRecord(qp, null);
 
                 readPacketSkippingPing(ia, qp);
-                Assert.assertEquals(Leader.NEWLEADER, qp.getType());
-                Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
-
-                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
-                oa.writeRecord(qp, null);
-
-                readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.UPTODATE, qp.getType());
             }
         });



Mime
View raw message