zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r1482318 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/Leader.java src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Date Tue, 14 May 2013 12:27:12 GMT
Author: fpj
Date: Tue May 14 12:27:11 2013
New Revision: 1482318

URL: http://svn.apache.org/r1482318
Log:
ZOOKEEPER-1324. Remove Duplicate NEWLEADER packets 
from the Leader to the Follower. (Thawan, fpj via fpj)


Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1482318&r1=1482317&r2=1482318&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue May 14 12:27:11 2013
@@ -350,6 +350,9 @@ BUGFIXES:
 
   ZOOKEEPER-1706. Typo in Double Barriers example (Jingguo Yao via fpj)
 
+  ZOOKEEPER-1324. Remove Duplicate NEWLEADER packets 
+  from the Leader to the Follower. (Thawan, fpj via fpj)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1482318&r1=1482317&r2=1482318&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Tue May 14
12:27:11 2013
@@ -33,6 +33,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -96,6 +97,16 @@ public class Leader {
             }
             return true;
         }
+        
+        public String ackSetsToString(){
+            StringBuilder sb = new StringBuilder();
+            
+            for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
+                sb.append(qvAckset.getAckset().toString()).append(",");
+            }
+            
+            return sb.substring(0, sb.length()-1);
+        }
 
         public static class QuorumVerifierAcksetPair {
             private final QuorumVerifier _qv;
@@ -120,7 +131,8 @@ public class Leader {
 
     final QuorumPeer self;
 
-    
+    private boolean quorumFormed = false;
+
     // the follower acceptor thread
     volatile LearnerCnxAcceptor cnxAcceptor = null;
 
@@ -461,10 +473,7 @@ public class Leader {
             if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
                newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
             }
-            newLeaderProposal.addAck(self.getId());
             
-            outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
                                 
-            LOG.debug("put newleader into outstanding proposals");
             // We have to get at least a majority of servers in sync with
             // us. We do this by waiting for the NEWLEADER packet to get
             // acknowledged
@@ -472,47 +481,34 @@ public class Leader {
              waitForEpochAck(self.getId(), leaderStateSummary);
              self.setCurrentEpoch(epoch);    
             
-            while (!newLeaderProposal.hasAllQuorums()){           
-            //while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
-                if (self.tick > self.initLimit) {
-                    // Followers aren't syncing fast enough,
-                    // renounce leadership!
-                    StringBuilder ackToString = new StringBuilder();
-                    
-                    for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs)
{
-                       if (ackToString.length() > 0) ackToString.append('\n');
-                       if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
{
-                           ackToString.append("Configuration " + qvAckset.getQuorumVerifier().getVersion()
+ ": waiting for a quorum of followers, only synced with: ");
-                           for(Long id : qvAckset.getAckset())
-                                ackToString.append(id + " ");
-                       }
-                    }
-                    shutdown(ackToString.toString());
-                    HashSet<Long> followerSet = new HashSet<Long>();
-
-                    for(LearnerHandler f : getLearners()) {
-                        if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
-                            followerSet.add(f.getSid());
-                        }
-                    }
-                    
-                    boolean initTicksShouldBeIncreased = true;
-                    for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs)
{
-                       if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
-                           initTicksShouldBeIncreased = false;
-                           break;
-                       }
-                    }                  
-                    if (initTicksShouldBeIncreased) {
-                        LOG.warn("Enough followers present. "+
-                                "Perhaps the initTicks need to be increased.");
-                    }
-                    return;
-                }
-                Thread.sleep(self.tickTime);
-                self.tick++;
-            }
+             try {
+                 waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
+             } catch (InterruptedException e) {
+                 shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+                         + newLeaderProposal.ackSetsToString() + " ]");
+                 HashSet<Long> followerSet = new HashSet<Long>();
+
+                 for(LearnerHandler f : getLearners()) {
+                     if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
+                         followerSet.add(f.getSid());
+                     }
+                 }    
+                 boolean initTicksShouldBeIncreased = true;
+                 for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs)
{
+                     if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
+                         initTicksShouldBeIncreased = false;
+                         break;
+                     }
+                 }                  
+                 if (initTicksShouldBeIncreased) {
+                     LOG.warn("Enough followers present. "+
+                             "Perhaps the initTicks need to be increased.");
+                 }
+                 return;
+             }
 
+             startZkServer();
+             
             /**
              * WARNING: do not use this for anything other than QA testing
              * on a real cluster. Specifically to enable verification that quorum
@@ -565,14 +561,13 @@ public class Leader {
                 }
 
               if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet))
{
-                //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2)
{
-                    // Lost quorum, shutdown
-                  // TODO: message is wrong unless majority quorums used
-                    shutdown("Only " + syncedSet.size() + " voting followers, need "
-                            + (self.getVotingView().size() / 2));
-                    // make sure the order is the same!
-                    // the leader goes to looking
-                    return;
+                  //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2)
{
+                  // Lost quorum, shutdown
+                  shutdown("Not sufficient followers synced, only synced with sids: [ "
+                          + getSidSetString(syncedSet) + " ]");
+                  // make sure the order is the same!
+                  // the leader goes to looking
+                  return;
               }
               tickSkip = !tickSkip;
             }
@@ -708,63 +703,44 @@ public class Leader {
         if (p.request != null) {
              toBeApplied.add(p);
         }
-     
-        // We don't commit the new leader proposal
-        if ((zxid & 0xffffffffL) != 0) {
-           if (p.request == null) {
-                LOG.warn("Going to commmit null: " + p);
-           } else if (p.request.getHdr().getType() == OpCode.reconfig) {                
                  
-                   LOG.debug("Committing a reconfiguration! " + outstandingProposals.size());

+
+        if (p.request == null) {
+            LOG.warn("Going to commmit null: " + p);
+        } else if (p.request.getHdr().getType() == OpCode.reconfig) {                   
               
+            LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); 
                  
-                   //if this server is voter in new config with the same quorum address,

-                   //then it will remain the leader
-                   //otherwise an up-to-date follower will be designated as leader. This
saves
-                   //leader election time, unless the designated leader fails           
                 
-                    Long designatedLeader = getDesignatedLeader(p, zxid);
-                    //LOG.warn("designated leader is: " + designatedLeader);
-                    
-                    QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
-                   
-                    self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
-                   
-                   if (designatedLeader != self.getId()) {
-                       allowedToCommit = false;
-                   }
+            //if this server is voter in new config with the same quorum address, 
+            //then it will remain the leader
+            //otherwise an up-to-date follower will be designated as leader. This saves
+            //leader election time, unless the designated leader fails                  
          
+            Long designatedLeader = getDesignatedLeader(p, zxid);
+            //LOG.warn("designated leader is: " + designatedLeader);
+
+            QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
+       
+            self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
+       
+            if (designatedLeader != self.getId()) {
+                allowedToCommit = false;
+            }
                    
-                   // we're sending the designated leader, and if the leader is changing
the followers are 
-                   // responsible for closing the connection - this way we are sure that
at least a majority of them 
-                   // receive the commit message.
-                   commitAndActivate(zxid, designatedLeader);
-                   informAndActivate(p, designatedLeader);
-                   //turnOffFollowers();
-               } else {
-                   commit(zxid);
-                   inform(p);
-               }
-               zk.commitProcessor.commit(p.request);
-               if(pendingSyncs.containsKey(zxid)){
-                   for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
-                      sendSync(r);
-                   }
-               }                
+            // we're sending the designated leader, and if the leader is changing the followers
are 
+            // responsible for closing the connection - this way we are sure that at least
a majority of them 
+            // receive the commit message.
+            commitAndActivate(zxid, designatedLeader);
+            informAndActivate(p, designatedLeader);
+            //turnOffFollowers();
         } else {
-                lastCommitted = zxid;                
-                if(LOG.isInfoEnabled()){
-                    LOG.info("Have quorum of supporters; starting up and setting last processed
zxid: " + zk.getZxid());
-                }
-                QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
-               
-               Long designatedLeader = getDesignatedLeader(p, zxid);                    
                    
-               
-               self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
-               if (designatedLeader != self.getId()) {
-                   allowedToCommit = false;
-               }
-               LOG.debug("GOT QUORUM of ACKS FOR NEWLEADER msg " + allowedToCommit);    
  
-                zk.startup();
-                zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
-                
+            commit(zxid);
+            inform(p);
         }
+        zk.commitProcessor.commit(p.request);
+        if(pendingSyncs.containsKey(zxid)){
+            for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
+                sendSync(r);
+            }               
+        } 
+        
         return  true;   
     }
     
@@ -788,6 +764,17 @@ public class Leader {
             }
             LOG.trace("outstanding proposals all");
         }
+        
+        if ((zxid & 0xffffffffL) == 0) {
+            /*
+             * We no longer process NEWLEADER ack with this method. However,
+             * the learner sends an ack back to the leader after it gets
+             * UPTODATE, so we just ignore the message.
+             */
+            return;
+        }
+            
+            
         if (outstandingProposals.size() == 0) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("outstanding is 0");
@@ -1197,6 +1184,104 @@ public class Leader {
             }
         }
     }
+    
+    /**
+     * Return a list of sid in set as string  
+     */
+    private String getSidSetString(Set<Long> sidSet) {
+        StringBuilder sids = new StringBuilder();
+        Iterator<Long> iter = sidSet.iterator();
+        while (iter.hasNext()) {
+            sids.append(iter.next());
+            if (!iter.hasNext()) {
+              break;
+            }
+            sids.append(",");
+        }
+        return sids.toString();
+    }
+
+    /**
+     * Start up Leader ZooKeeper server and initialize zxid to the new epoch
+     */
+    private synchronized void startZkServer() {
+        // Update lastCommitted and Db's zxid to a value representing the new epoch
+        lastCommitted = zk.getZxid();
+        LOG.info("Have quorum of supporters, sids: [ "
+                + newLeaderProposal.ackSetsToString()
+                + " ]; starting up and setting last processed zxid: 0x{}",
+                Long.toHexString(zk.getZxid()));
+        
+        /*
+         * ZOOKEEPER-1324. the leader sends the new config it must complete
+         *  to others inside a NEWLEADER message (see LearnerHandler where
+         *  the NEWLEADER message is constructed), and once it has enough
+         *  acks we must execute the following code so that it applies the
+         *  config to itself.
+         */
+        QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
+        
+        Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());   
                                     
+        
+        self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
+        if (designatedLeader != self.getId()) {
+            allowedToCommit = false;
+        }
+        
+        zk.startup();
+        zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
+    }
+
+    /**
+     * Process NEWLEADER ack of a given sid and wait until the leader receives
+     * sufficient acks.
+     *
+     * @param sid
+     * @param learnerType
+     * @throws InterruptedException
+     */
+    public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
+            throws InterruptedException {
+
+        synchronized (newLeaderProposal.qvAcksetPairs) {
+
+            if (quorumFormed) {
+                return;
+            }
+
+            long currentZxid = newLeaderProposal.packet.getZxid();
+            if (zxid != currentZxid) {
+                LOG.error("NEWLEADER ACK from sid: " + sid
+                        + " is from a different epoch - current 0x"
+                        + Long.toHexString(currentZxid) + " receieved 0x"
+                        + Long.toHexString(zxid));
+                return;
+            }
+
+            /*
+             * Note that addAck already checks that the learner
+             * is a PARTICIPANT.
+             */
+            newLeaderProposal.addAck(sid);
+
+            if (newLeaderProposal.hasAllQuorums()) {
+                quorumFormed = true;
+                newLeaderProposal.qvAcksetPairs.notifyAll();
+            } else {
+                long start = System.currentTimeMillis();
+                long cur = start;
+                long end = start + self.getInitLimit() * self.getTickTime();
+                while (!quorumFormed && cur < end) {
+                    newLeaderProposal.qvAcksetPairs.wait(end - cur);
+                    cur = System.currentTimeMillis();
+                }
+                if (!quorumFormed) {
+                    throw new InterruptedException(
+                            "Timeout while waiting for NEWLEADER to be acked by quorum");
+                }
+            }
+        }
+    }
 
     /**
      * Get string representation of a given packet type

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1482318&r1=1482317&r2=1482318&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Tue
May 14 12:27:11 2013
@@ -475,8 +475,11 @@ public class LearnerHandler extends Thre
                 LOG.error("Next packet was supposed to be an ACK");
                 return;
             }
-            LOG.info("Received NEWLEADER-ACK message from " + sid);
-            leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
+
+            if(LOG.isDebugEnabled()){
+            	LOG.debug("Received NEWLEADER-ACK message from " + sid);   
+            }
+            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
             
             // now that the ack has been processed expect the syncLimit
             sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1482318&r1=1482317&r2=1482318&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Tue May
14 12:27:11 2013
@@ -832,15 +832,6 @@ public class Zab1_0Test {
                 oa.writeRecord(qp, null);
 
                 readPacketSkippingPing(ia, qp);
-                Assert.assertEquals(Leader.NEWLEADER, qp.getType());
-                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-                Assert.assertEquals(1, l.self.getAcceptedEpoch());
-                Assert.assertEquals(1, l.self.getCurrentEpoch());
-                
-                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
-                oa.writeRecord(qp, null);
-
-                readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.UPTODATE, qp.getType());
             }
         });
@@ -885,13 +876,6 @@ public class Zab1_0Test {
                 oa.writeRecord(qp, null);
 
                 readPacketSkippingPing(ia, qp);
-                Assert.assertEquals(Leader.NEWLEADER, qp.getType());
-                Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
-
-                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
-                oa.writeRecord(qp, null);
-
-                readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.UPTODATE, qp.getType());
             }
         });



Mime
View raw message