zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1135382 - in /zookeeper/trunk: ./ src/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/main/org/apache/zookeeper/server/util/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/...
Date Tue, 14 Jun 2011 05:14:06 GMT
Author: breed
Date: Tue Jun 14 05:14:05 2011
New Revision: 1135382

URL: http://svn.apache.org/viewvc?rev=1135382&view=rev
Log:
ZOOKEEPER-335. zookeeper servers should commit the new leader txn to their logs.
ZOOKEEPER-1081. modify leader/follower code to correctly deal with new leader
ZOOKEEPER-1082. modify leader election to correctly take into account current epoch

Added:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
    zookeeper/trunk/src/zookeeper.jute

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jun 14 05:14:05 2011
@@ -223,6 +223,12 @@ BUGFIXES: 
 
   ZOOKEEPER-1086. zookeeper test jar has non mavenised dependency. (Ivan Kelly via michim)
 
+  ZOOKEEPER-335. zookeeper servers should commit the new leader txn to their logs. (breed)
+
+  ZOOKEEPER-1081. modify leader/follower code to correctly deal with new leader (breed)
+
+  ZOOKEEPER-1082. modify leader election to correctly take into account current epoch (fpj via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Tue Jun 14 05:14:05 2011
@@ -18,10 +18,19 @@
 
 package org.apache.zookeeper.server;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -57,6 +66,7 @@ import org.apache.zookeeper.server.auth.
 import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.server.util.ZxidUtils;
 
 /**
  * This class implements a simple standalone ZooKeeperServer. It sets up the
@@ -253,6 +263,7 @@ public class ZooKeeperServer implements 
             // XXX: Is lastProcessedZxid really the best thing to use?
             killSession(session, zkDb.getDataTreeLastProcessedZxid());
         }
+
         // Make a clean snapshot
         takeSnapshot();
     }
@@ -884,4 +895,6 @@ public class ZooKeeperServer implements 
         }
         cnxn.incrOutstandingRequests(h);
     }
+
+
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Tue Jun 14 05:14:05 2011
@@ -19,6 +19,7 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,6 +34,7 @@ import org.apache.zookeeper.server.quoru
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.util.ZxidUtils;
 
 
 /**
@@ -95,7 +97,7 @@ public class FastLeaderElection implemen
         /*
          * Epoch
          */
-        long epoch;
+        long electionEpoch;
 
         /*
          * current state of sender
@@ -106,6 +108,11 @@ public class FastLeaderElection implemen
          * Address of sender
          */
         long sid;
+
+        /*
+         * epoch of the proposed leader
+         */
+        long peerEpoch;
     }
 
     /**
@@ -119,15 +126,17 @@ public class FastLeaderElection implemen
         ToSend(mType type,
                 long leader,
                 long zxid,
-                long epoch,
+                long electionEpoch,
                 ServerState state,
-                long sid) {
+                long sid,
+                long peerEpoch) {
 
             this.leader = leader;
             this.zxid = zxid;
-            this.epoch = epoch;
+            this.electionEpoch = electionEpoch;
             this.state = state;
             this.sid = sid;
+            this.peerEpoch = peerEpoch;
         }
 
         /*
@@ -143,7 +152,7 @@ public class FastLeaderElection implemen
         /*
          * Epoch
          */
-        long epoch;
+        long electionEpoch;
 
         /*
          * Current state;
@@ -154,6 +163,11 @@ public class FastLeaderElection implemen
          * Address of recipient
          */
         long sid;
+        
+        /*
+         * Leader epoch
+         */
+        long peerEpoch;
     }
 
     LinkedBlockingQueue<ToSend> sendqueue;
@@ -206,7 +220,8 @@ public class FastLeaderElection implemen
                                     current.zxid,
                                     logicalclock,
                                     self.getPeerState(),
-                                    response.sid);
+                                    response.sid,
+                                    current.peerEpoch);
 
                             sendqueue.offer(notmsg);
                         } else {
@@ -216,11 +231,15 @@ public class FastLeaderElection implemen
                                         + self.getId());
                             }
 
+                            /*
+                             * We check for 28 bytes for backward compatibility
+                             */
                             if (response.buffer.capacity() < 28) {
                                 LOG.error("Got a short response: "
                                         + response.buffer.capacity());
                                 continue;
                             }
+                            boolean backCompatibility = (response.buffer.capacity() == 28);
                             response.buffer.clear();
 
                             // State of peer that sent this message
@@ -244,9 +263,17 @@ public class FastLeaderElection implemen
                             Notification n = new Notification();
                             n.leader = response.buffer.getLong();
                             n.zxid = response.buffer.getLong();
-                            n.epoch = response.buffer.getLong();
+                            n.electionEpoch = response.buffer.getLong();
                             n.state = ackstate;
                             n.sid = response.sid;
+                            if(!backCompatibility){
+                                n.peerEpoch = response.buffer.getLong();
+                            } else {
+                                if(LOG.isInfoEnabled()){
+                                    LOG.info("Backward compatibility mode, server id: " + n.sid);
+                                }
+                                n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
+                            }
 
                             /*
                              * Print notification info
@@ -268,14 +295,15 @@ public class FastLeaderElection implemen
                                  * lagging behind.
                                  */
                                 if((ackstate == QuorumPeer.ServerState.LOOKING)
-                                        && (n.epoch < logicalclock)){
+                                        && (n.electionEpoch < logicalclock)){
                                     Vote v = getVote();
                                     ToSend notmsg = new ToSend(ToSend.mType.notification,
                                             v.id,
                                             v.zxid,
                                             logicalclock,
                                             self.getPeerState(),
-                                            response.sid);
+                                            response.sid,
+                                            v.peerEpoch);
                                     sendqueue.offer(notmsg);
                                 }
                             } else {
@@ -298,7 +326,8 @@ public class FastLeaderElection implemen
                                             current.zxid,
                                             logicalclock,
                                             self.getPeerState(),
-                                            response.sid);
+                                            response.sid,
+                                            current.peerEpoch);
                                     sendqueue.offer(notmsg);
                                 }
                             }
@@ -347,7 +376,7 @@ public class FastLeaderElection implemen
              * @param m     message to send
              */
             private void process(ToSend m) {
-                byte requestBytes[] = new byte[28];
+                byte requestBytes[] = new byte[36];
                 ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
 
                 /*
@@ -358,7 +387,8 @@ public class FastLeaderElection implemen
                 requestBuffer.putInt(m.state.ordinal());
                 requestBuffer.putLong(m.leader);
                 requestBuffer.putLong(m.zxid);
-                requestBuffer.putLong(m.epoch);
+                requestBuffer.putLong(m.electionEpoch);
+                requestBuffer.putLong(m.peerEpoch);
 
                 manager.toSend(m.sid, requestBuffer);
 
@@ -413,6 +443,7 @@ public class FastLeaderElection implemen
     volatile long logicalclock; /* Election instance */
     long proposedLeader;
     long proposedZxid;
+    long proposedEpoch;
 
 
     /**
@@ -494,12 +525,13 @@ public class FastLeaderElection implemen
                     proposedZxid,
                     logicalclock,
                     QuorumPeer.ServerState.LOOKING,
-                    sid);
+                    sid,
+                    proposedEpoch);
             if(LOG.isDebugEnabled()){
                 LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), "  +
                       proposedZxid + " (n.zxid), " + logicalclock  +
                       " (n.round), " + sid + " (recipient), " + self.getId() +
-                      " (myid)");
+                      " (myid), " + proposedEpoch + " (n.peerEpoch)");
             }
             sendqueue.offer(notmsg);
         }
@@ -508,9 +540,9 @@ public class FastLeaderElection implemen
 
     private void printNotification(Notification n){
         LOG.info("Notification: " + n.leader + " (n.leader), " + n.zxid +
-                " (n.zxid), " + n.epoch + " (n.round), " + n.state +
-                " (n.state), " + n.sid + " (n.sid), " + self.getPeerState() +
-                " (my state)");
+                " (n.zxid), " + n.electionEpoch + " (n.round), " + n.state +
+                " (n.state), " + n.sid + " (n.sid), " + n.peerEpoch + " (n.peerEPoch), " +
+                self.getPeerState() + " (my state)");
     }
 
     /**
@@ -520,14 +552,16 @@ public class FastLeaderElection implemen
      * @param id    Server identifier
      * @param zxid  Last zxid observed by the issuer of this vote
      */
-    private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) {
+    private boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
         LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: " + 
                 newZxid + ", proposed zxid: " + curZxid);
         if(self.getQuorumVerifier().getWeight(newId) == 0){
             return false;
         }
         
-        return ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)));
+        return ((newEpoch > curEpoch) || 
+                ((newEpoch == curEpoch) && (newZxid > curZxid)) || 
+                ((newZxid == curZxid) && (newId > curId)));
     }
 
     /**
@@ -566,12 +600,12 @@ public class FastLeaderElection implemen
      *
      * @param votes set of votes
      * @param   leader  leader id
-     * @param   epoch   epoch id
+     * @param   electionEpoch   epoch id
      */
     private boolean checkLeader(
             HashMap<Long, Vote> votes,
             long leader,
-            long epoch){
+            long electionEpoch){
 
         boolean predicate = true;
 
@@ -590,7 +624,7 @@ public class FastLeaderElection implemen
         return predicate;
     }
 
-    synchronized void updateProposal(long leader, long zxid){
+    synchronized void updateProposal(long leader, long zxid, long epoch){
         if(LOG.isDebugEnabled()){
             LOG.debug("Updating proposal: " + leader + " (newleader), " + zxid +
                   " (newzxid), " + proposedLeader + " (oldleader), " +
@@ -598,10 +632,11 @@ public class FastLeaderElection implemen
         }
         proposedLeader = leader;
         proposedZxid = zxid;
+        proposedEpoch = epoch;
     }
 
     synchronized Vote getVote(){
-        return new Vote(proposedLeader, proposedZxid);
+        return new Vote(proposedLeader, proposedZxid, proposedEpoch);
     }
 
     /**
@@ -645,6 +680,23 @@ public class FastLeaderElection implemen
     }
 
     /**
+     * Returns the initial vote value of the peer epoch.
+     *
+     * @return long
+     */
+    private long getPeerEpoch(){
+        if(self.getLearnerType() == LearnerType.PARTICIPANT)
+        	try {
+        		return self.getCurrentEpoch();
+        	} catch(IOException e) {
+        		RuntimeException re = new RuntimeException(e.getMessage());
+        		re.setStackTrace(e.getStackTrace());
+        		throw re;
+        	}
+        else return Long.MIN_VALUE;
+    }
+    
+    /**
      * Starts a new round of leader election. Whenever our QuorumPeer
      * changes its state to LOOKING, this method is invoked, and it
      * sends notifications to all other peers.
@@ -670,7 +722,7 @@ public class FastLeaderElection implemen
 
             synchronized(this){
                 logicalclock++;
-                    updateProposal(getInitId(), getInitLastLoggedZxid());
+                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
             }
 
             LOG.info("New election. My id =  " + self.getId() +
@@ -717,47 +769,48 @@ public class FastLeaderElection implemen
                     switch (n.state) {
                     case LOOKING:
                         // If notification > current, replace and send messages out
-                        if (n.epoch > logicalclock) {
-                            logicalclock = n.epoch;
+                        if (n.electionEpoch > logicalclock) {
+                            logicalclock = n.electionEpoch;
                             recvset.clear();
-                            if(totalOrderPredicate(n.leader, n.zxid,
-                                    getInitId(), getInitLastLoggedZxid())) {
-                                updateProposal(n.leader, n.zxid);
+                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
+                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
+                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                             } else {
                                 updateProposal(getInitId(),
-                                        getInitLastLoggedZxid());
+                                        getInitLastLoggedZxid(),
+                                        getPeerEpoch());
                             }
                             sendNotifications();
-                        } else if (n.epoch < logicalclock) {
+                        } else if (n.electionEpoch < logicalclock) {
                             if(LOG.isDebugEnabled()){
-                                LOG.debug("Notification epoch is smaller than logicalclock. n.epoch = " + n.epoch
+                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = " + n.electionEpoch
                                         + ", Logical clock" + logicalclock);
                             }
                             break;
-                        } else if (totalOrderPredicate(n.leader, n.zxid,
-                                proposedLeader, proposedZxid)) {
-                            updateProposal(n.leader, n.zxid);
+                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
+                                proposedLeader, proposedZxid, proposedEpoch)) {
+                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                             sendNotifications();
                         }
 
                         if(LOG.isDebugEnabled()){
                             LOG.debug("Adding vote: From = " + n.sid +
                                     ", Proposed leader = " + n.leader +
-                                    ", Porposed zxid = " + n.zxid +
-                                    ", Proposed epoch = " + n.epoch);
+                                    ", Proposed zxid = " + n.zxid +
+                                    ", Proposed election epoch = " + n.electionEpoch);
                         }
 
-                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
 
                         if (termPredicate(recvset,
                                 new Vote(proposedLeader, proposedZxid,
-                                        logicalclock))) {
+                                        logicalclock, proposedEpoch))) {
 
                             // Verify if there is any change in the proposed leader
                             while((n = recvqueue.poll(finalizeWait,
                                     TimeUnit.MILLISECONDS)) != null){
-                                if(totalOrderPredicate(n.leader, n.zxid,
-                                        proposedLeader, proposedZxid)){
+                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
+                                        proposedLeader, proposedZxid, proposedEpoch)){
                                     recvqueue.put(n);
                                     break;
                                 }
@@ -772,7 +825,7 @@ public class FastLeaderElection implemen
                                         ServerState.LEADING: learningState());
 
                                 Vote endVote = new Vote(proposedLeader,
-                                        proposedZxid);
+                                        proposedZxid, proposedEpoch);
                                 leaveInstance(endVote);
                                 return endVote;
                             }
@@ -787,15 +840,15 @@ public class FastLeaderElection implemen
                          * Consider all notifications from the same epoch
                          * together.
                          */
-                        if(n.epoch == logicalclock){
-                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+                        if(n.electionEpoch == logicalclock){
+                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                             if(termPredicate(recvset, new Vote(n.leader,
-                                            n.zxid, n.epoch, n.state))
-                                            && checkLeader(outofelection, n.leader, n.epoch)) {
+                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
+                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                 self.setPeerState((n.leader == self.getId()) ?
                                         ServerState.LEADING: learningState());
 
-                                Vote endVote = new Vote(n.leader, n.zxid);
+                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                 leaveInstance(endVote);
                                 return endVote;
                             }
@@ -806,16 +859,16 @@ public class FastLeaderElection implemen
                          * a majority are following the same leader.
                          */
                         outofelection.put(n.sid, new Vote(n.leader, n.zxid,
-                                n.epoch, n.state));
+                                n.electionEpoch, n.peerEpoch, n.state));
                         if (termPredicate(outofelection, new Vote(n.leader,
-                                n.zxid, n.epoch, n.state))
-                                && checkLeader(outofelection, n.leader, n.epoch)) {
+                                n.zxid, n.electionEpoch, n.peerEpoch, n.state))
+                                && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                             synchronized(this){
-                                logicalclock = n.epoch;
+                                logicalclock = n.electionEpoch;
                                 self.setPeerState((n.leader == self.getId()) ?
                                         ServerState.LEADING: learningState());
                             }
-                            Vote endVote = new Vote(n.leader, n.zxid);
+                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                             leaveInstance(endVote);
                             return endVote;
                         }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Tue Jun 14 05:14:05 2011
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -64,20 +65,21 @@ public class Follower extends Learner{
         self.start_fle = 0;
         self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
-        try {            
+        try {
             InetSocketAddress addr = findLeader();            
             try {
                 connectToLeader(addr);
-                long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO);
+                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
+
                 //check to see if the leader zxid is lower than ours
                 //this should never happen but is just a safety check
-                long lastLoggedZxid = self.getLastLoggedZxid();
-                if ((newLeaderZxid >> 32L) < (lastLoggedZxid >> 32L)) {
-                    LOG.error("Leader epoch " + Long.toHexString(newLeaderZxid >> 32L)
-                            + " is less than our epoch " + Long.toHexString(lastLoggedZxid >> 32L));
+                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
+                if (newEpoch < self.getAcceptedEpoch()) {
+                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                     throw new IOException("Error: Epoch of leader is lower");
                 }
-                syncWithLeader(newLeaderZxid);                
+                syncWithLeader(newEpochZxid);                
                 QuorumPacket qp = new QuorumPacket();
                 while (self.isRunning()) {
                     readPacket(qp);

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Tue Jun 14 05:14:05 2011
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -43,6 +44,8 @@ import org.apache.zookeeper.server.Final
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.ZxidUtils;
 
 /**
  * This class has the control logic for the Leader.
@@ -177,6 +180,17 @@ public class Leader {
     final static int UPTODATE = 12;
 
     /**
+     * This message is the first that a follower receives from the leader.
+     * It has the protocol version and the epoch of the leader.
+     */
+    public static final int LEADERINFO = 17;
+
+    /**
+     * This message is used by the follow to ack a proposed epoch.
+     */
+    public static final int ACKEPOCH = 18;
+    
+    /**
      * This message type is sent to a leader to request and mutation operation.
      * The payload will consist of a request header followed by a request.
      */
@@ -219,8 +233,8 @@ public class Leader {
      * This message type informs observers of a committed proposal.
      */
     final static int INFORM = 8;
-    
-    private ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
+
+	ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
 
     ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
 
@@ -263,6 +277,12 @@ public class Leader {
         }
     }
 
+    StateSummary leaderStateSummary;
+    
+    long epoch = -1;
+    boolean waitingForNewEpoch = true;
+    boolean readyToStart = false;
+    
     /**
      * This method is main function that is called to lead
      * 
@@ -282,9 +302,16 @@ public class Leader {
             self.tick = 0;
             zk.loadData();
             
-            long epoch = self.getLastLoggedZxid() >> 32L;
-            epoch++;
-            zk.setZxid(epoch << 32L);
+            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
+
+            // Start thread that waits for connection requests from 
+            // new followers.
+            cnxAcceptor = new LearnerCnxAcceptor();
+            cnxAcceptor.start();
+            
+            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
+            self.setAcceptedEpoch(epoch);
+            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
             
             synchronized(this){
                 lastProposed = zk.getZxid();
@@ -300,11 +327,10 @@ public class Leader {
             }
             outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
             
-            // Start thread that waits for connection requests from 
-            // new followers.
-            cnxAcceptor = new LearnerCnxAcceptor();
-            cnxAcceptor.start();
-            
+            readyToStart = true;
+            waitForEpochAck(self.getId(), leaderStateSummary);
+            self.setCurrentEpoch(epoch);
+
             // We have to get at least a majority of servers in sync with
             // us. We do this by waiting for the NEWLEADER packet to get
             // acknowledged
@@ -384,7 +410,7 @@ public class Leader {
         }
     }
 
-    boolean isShutdown;
+	boolean isShutdown;
 
     /**
      * Close down all the LearnerHandlers
@@ -625,7 +651,7 @@ public class Leader {
      * @return
      */
     public long getEpoch(){
-        return lastProposed >> 32L;
+        return ZxidUtils.getEpochFromZxid(lastProposed);
     }
     
     /**
@@ -740,4 +766,53 @@ public class Leader {
         return lastProposed;
     }
 
+    private HashSet<Long> connectingFollowers = new HashSet<Long>();
+	public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException {
+		synchronized(connectingFollowers) {
+			if (!waitingForNewEpoch) {
+				return epoch;
+			}
+			if (lastAcceptedEpoch > epoch) {
+				epoch = lastAcceptedEpoch+1;
+			}
+			connectingFollowers.add(sid);
+			QuorumVerifier verifier = self.getQuorumVerifier();
+			if (verifier.containsQuorum(connectingFollowers)) {
+				waitingForNewEpoch = false;
+				connectingFollowers.notifyAll();
+			} else {
+				connectingFollowers.wait(self.getInitLimit()*self.getTickTime());
+				if (waitingForNewEpoch) {
+					throw new InterruptedException("Out of time to propose an epoch");
+				}
+			}
+			return epoch;
+		}
+	}
+
+	private HashSet<Long> electingFollowers = new HashSet<Long>();
+	private boolean electionFinished = false;
+	public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
+		synchronized(electingFollowers) {
+			if (electionFinished) {
+				return;
+			}
+			if (ss.getCurrentEpoch() != -1) {
+				if (ss.isMoreRecentThan(leaderStateSummary)) {
+					throw new IOException("Follower is ahead of the leader");
+				}
+				electingFollowers.add(id);
+			}
+			QuorumVerifier verifier = self.getQuorumVerifier();
+			if (readyToStart && verifier.containsQuorum(electingFollowers)) {
+				electionFinished = true;
+				electingFollowers.notifyAll();
+			} else {
+				electingFollowers.wait(self.getInitLimit()*self.getTickTime());
+				if (waitingForNewEpoch) {
+					throw new InterruptedException("Out of time to propose an epoch");
+				}
+			}
+		}
+	}
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Tue Jun 14 05:14:05 2011
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map.Entry;
@@ -44,6 +45,7 @@ import org.apache.zookeeper.server.Serve
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -72,7 +74,9 @@ public class Learner {       
     }
     
     protected InputArchive leaderIs;
-    protected OutputArchive leaderOs;    
+    protected OutputArchive leaderOs;  
+    /** the protocol version of the leader */
+    protected int leaderProtocolVersion = 0x01;
     
     protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
 
@@ -250,22 +254,48 @@ public class Learner {       
         /*
          * Send follower info, including last zxid and sid
          */
+    	long lastLoggedZxid = self.getLastLoggedZxid();
         QuorumPacket qp = new QuorumPacket();                
         qp.setType(pktType);
-        long sentLastZxid = self.getLastLoggedZxid();
-        qp.setZxid(sentLastZxid);
+        qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
         
         /*
          * Add sid to payload
          */
+        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
         ByteArrayOutputStream bsid = new ByteArrayOutputStream();
-        DataOutputStream dsid = new DataOutputStream(bsid);
-        dsid.writeLong(self.getId());
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
+        boa.writeRecord(li, "LearnerInfo");
         qp.setData(bsid.toByteArray());
         
         writePacket(qp, true);
         readPacket(qp);        
-
+        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
+		if (qp.getType() == Leader.LEADERINFO) {
+        	// we are connected to a 1.0 server so accept the new epoch and read the next packet
+        	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
+        	byte epochBytes[] = new byte[4];
+        	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
+        	if (newEpoch > self.getAcceptedEpoch()) {
+        		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
+        		self.setAcceptedEpoch(newEpoch);
+        	} else if (newEpoch == self.getAcceptedEpoch()) {
+        		// since we have already acked an epoch equal to the leaders, we cannot ack
+        		// again, but we still need to send our lastZxid to the leader so that we can
+        		// sync with it if it does assume leadership of the epoch.
+        		// the -1 indicates that this reply should not count as an ack for the new epoch
+                wrappedEpochBytes.putInt(-1);
+        	} else {
+        		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
+        	}
+        	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
+        	writePacket(ackNewEpoch, true);
+        	readPacket(qp);
+        } else {
+        	if (newEpoch > self.getAcceptedEpoch()) {
+        		self.setAcceptedEpoch(newEpoch);
+        	}
+        }
         if (qp.getType() != Leader.NEWLEADER) {
             LOG.error("First packet should have been NEWLEADER");
             throw new IOException("First packet should have been NEWLEADER");
@@ -321,9 +351,6 @@ public class Learner {       
 
             }
             zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-            if(LOG.isInfoEnabled()){
-                LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >> 32L));
-            }
                         
             long lastQueued = 0;
             // we are now going to start getting transactions to apply followed by an UPTODATE
@@ -369,7 +396,9 @@ public class Learner {       
                 }
             }
         }
-        ack.setZxid(newLeaderZxid & ~0xffffffffL);
+        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
+        self.setCurrentEpoch(newEpoch);
+        ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
         sock.setSoTimeout(self.tickTime * self.syncLimit);
         zk.startup();

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Tue Jun 14 05:14:05 2011
@@ -40,10 +40,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -73,6 +75,12 @@ public class LearnerHandler extends Thre
         return sid;
     }                    
 
+    protected int version = 0x1;
+    
+    int getVersion() {
+    	return version;
+    }
+    
     /**
      * The packets to be sent to the learner
      */
@@ -226,8 +234,7 @@ public class LearnerHandler extends Thre
      */
     @Override
     public void run() {
-        try {
-            
+        try {            
             ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
                     .getInputStream()));
             bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
@@ -240,9 +247,17 @@ public class LearnerHandler extends Thre
                         + " is not FOLLOWERINFO or OBSERVERINFO!");
                 return;
             }
-            if (qp.getData() != null) {
-            	ByteBuffer bbsid = ByteBuffer.wrap(qp.getData());
-                this.sid = bbsid.getLong();
+            byte learnerInfoData[] = qp.getData();
+            if (learnerInfoData != null) {
+            	if (learnerInfoData.length == 8) {
+            		ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
+            		this.sid = bbsid.getLong();
+            	} else {
+            		LearnerInfo li = new LearnerInfo();
+            		ZooKeeperServer.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
+            		this.sid = li.getServerid();
+            		this.version = li.getProtocolVersion();
+            	}
             } else {
             	this.sid = leader.followerCounter.getAndDecrement();
             }
@@ -254,7 +269,42 @@ public class LearnerHandler extends Thre
                   learnerType = LearnerType.OBSERVER;
             }            
             
-            long peerLastZxid = qp.getZxid();
+            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
+            
+            long peerLastZxid;
+            StateSummary ss = null;
+            if (learnerType == LearnerType.PARTICIPANT) {
+            	long zxid = qp.getZxid();
+				long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
+				
+				if (this.getVersion() < 0x10000) {
+					// we are going to have to extrapolate the epoch information
+					long epoch = ZxidUtils.getEpochFromZxid(zxid);
+					ss = new StateSummary(epoch, zxid);
+					// fake the message
+					leader.waitForEpochAck(this.getSid(), ss);
+				} else {
+					byte ver[] = new byte[4];
+					ByteBuffer.wrap(ver).putInt(0x10000);
+				    QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
+				    oa.writeRecord(newEpochPacket, "packet");
+		            bufferedOutput.flush();
+		            QuorumPacket ackEpochPacket = new QuorumPacket();
+		            ia.readRecord(ackEpochPacket, "packet");
+		            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
+		             	LOG.error(ackEpochPacket.toString()
+		                        + " is not ACKEPOCH");
+		                return;
+		            }
+            		ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
+		            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
+		            leader.waitForEpochAck(this.getSid(), ss);
+				}
+            	peerLastZxid = ss.getLastZxid();
+            } else {
+            	peerLastZxid = qp.getZxid();
+            }
+            
             /* the default to send to the follower */
             int packetToSend = Leader.SNAP;
             long zxidToSend = 0;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jun 14 05:14:05 2011
@@ -17,8 +17,14 @@
  */
 package org.apache.zookeeper.server.quorum;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
@@ -41,6 +47,7 @@ import org.apache.zookeeper.server.ZooKe
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.ZxidUtils;
 
 /**
  * This class manages the quorum protocol. There are three states this server
@@ -395,16 +402,50 @@ public class QuorumPeer extends Thread i
     
     @Override
     public synchronized void start() {
-        try {
+        loadDataBase();
+        cnxnFactory.start();        
+        startLeaderElection();
+        super.start();
+    }
+
+	private void loadDataBase() {
+		try {
             zkDb.loadDataBase();
+
+            // load the epochs
+            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
+    		long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
+            try {
+            	currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
+            } catch(FileNotFoundException e) {
+            	// pick a reasonable epoch number
+            	// this should only happen once when moving to a
+            	// new code version
+            	LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation");
+            	currentEpoch = epochOfZxid;
+            	writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
+            }
+            if (epochOfZxid > currentEpoch) {
+            	throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
+            }
+            try {
+            	acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
+            } catch(FileNotFoundException e) {
+            	// pick a reasonable epoch number
+            	// this should only happen once when moving to a
+            	// new code version
+            	LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation");
+            	acceptedEpoch = epochOfZxid;
+            	writeLongToFile(CURRENT_EPOCH_FILENAME, acceptedEpoch);
+            }
+            if (acceptedEpoch < currentEpoch) {
+            	throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));
+            }
         } catch(IOException ie) {
             LOG.error("Unable to load database on disk", ie);
             throw new RuntimeException("Unable to run quorum server ", ie);
         }
-        cnxnFactory.start();        
-        startLeaderElection();
-        super.start();
-    }
+	}
 
     ResponderThread responder;
     
@@ -413,7 +454,13 @@ public class QuorumPeer extends Thread i
         responder.interrupt();
     }
     synchronized public void startLeaderElection() {
-        currentVote = new Vote(myid, getLastLoggedZxid());
+    	try {
+    		currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
+    	} catch(IOException e) {
+    		RuntimeException re = new RuntimeException(e.getMessage());
+    		re.setStackTrace(e.getStackTrace());
+    		throw re;
+    	}
         for (QuorumServer p : getView().values()) {
             if (p.id == myid) {
                 myQuorumAddr = p.addr;
@@ -487,16 +534,10 @@ public class QuorumPeer extends Thread i
      * @return the highest zxid for this host
      */
     public long getLastLoggedZxid() {
-        long lastLogged= -1L;
-        try {
-            if (!zkDb.isInitialized()) {
-                zkDb.loadDataBase();
-            }
-            lastLogged = zkDb.getDataTreeLastProcessedZxid();
-        } catch(IOException ie) {
-            LOG.warn("Unable to load database ", ie);
+        if (!zkDb.isInitialized()) {
+        	loadDataBase();
         }
-        return lastLogged;
+        return zkDb.getDataTreeLastProcessedZxid();
     }
     
     public Follower follower;
@@ -1002,5 +1043,63 @@ public class QuorumPeer extends Thread i
      */
     public QuorumCnxManager getQuorumCnxManager() {
         return qcm;
-}
+    }
+    private long readLongFromFile(String name) throws IOException {
+    	File file = new File(logFactory.getSnapDir(), name);
+		BufferedReader br = new BufferedReader(new FileReader(file));
+		String line = "";
+		try {
+			line = br.readLine();
+    		return Long.parseLong(line);
+    	} catch(NumberFormatException e) {
+    		throw new IOException("Found " + line + " in " + file);
+    	} finally {
+    		br.close();
+    	}
+    }
+
+    private long acceptedEpoch = -1;
+    private long currentEpoch = -1;
+
+	public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";
+
+	public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
+
+    private void writeLongToFile(String name, long value) throws IOException {
+    	File file = new File(logFactory.getSnapDir(), name);
+		FileOutputStream out = new FileOutputStream(file);
+		BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
+    	try {
+    		bw.write(Long.toString(value));
+    		bw.flush();
+    		out.getFD().sync();
+    	} finally {
+    		bw.close();
+    	}
+    }
+
+    public long getCurrentEpoch() throws IOException {
+		if (currentEpoch == -1) {
+			currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
+		}
+		return currentEpoch;
+	}
+	
+	public long getAcceptedEpoch() throws IOException {
+		if (acceptedEpoch == -1) {
+			acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
+		}
+		return acceptedEpoch;
+	}
+	
+	public void setCurrentEpoch(long e) throws IOException {
+		currentEpoch = e;
+		writeLongToFile(CURRENT_EPOCH_FILENAME, e);
+		
+	}
+	
+	public void setAcceptedEpoch(long e) throws IOException {
+		acceptedEpoch = e;
+		writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
+	}
 }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java?rev=1135382&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java Tue Jun 14 05:14:05 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+/**
+ * This class encapsulates the state comparison logic. Specifically,
+ * how two different states are compared.
+ */
+public class StateSummary {
+	private long currentEpoch;
+	private long lastZxid;
+	public StateSummary(long currentEpoch, long lastZxid) {
+		this.currentEpoch = currentEpoch;
+		this.lastZxid = lastZxid;
+	}
+	
+	public long getCurrentEpoch() {
+		return currentEpoch;
+	}
+	
+	public long getLastZxid() {
+		return lastZxid;
+	}
+	
+	public boolean isMoreRecentThan(StateSummary ss) {
+		return (currentEpoch > ss.currentEpoch) || (currentEpoch == ss.currentEpoch && lastZxid > ss.lastZxid);
+	}
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof StateSummary)) {
+			return false;
+		}
+		StateSummary ss = (StateSummary)obj;
+		return currentEpoch == ss.currentEpoch && lastZxid == ss.lastZxid;
+	}
+	
+	@Override
+	public int hashCode() {
+		return (int)(currentEpoch ^ lastZxid);
+	}
+}

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java Tue Jun 14 05:14:05 2011
@@ -22,29 +22,40 @@ import org.apache.zookeeper.server.quoru
 
 
 public class Vote {
+    
     public Vote(long id, long zxid) {
         this.id = id;
         this.zxid = zxid;
     }
+    
+    public Vote(long id, long zxid, long peerEpoch) {
+        this.id = id;
+        this.zxid = zxid;
+        this.peerEpoch = peerEpoch;
+    }
 
-    public Vote(long id, long zxid, long epoch) {
+    public Vote(long id, long zxid, long electionEpoch, long peerEpoch) {
         this.id = id;
         this.zxid = zxid;
-        this.epoch = epoch;
+        this.electionEpoch = electionEpoch;
+        this.peerEpoch = peerEpoch;
     }
     
-    public Vote(long id, long zxid, long epoch, ServerState state) {
+    public Vote(long id, long zxid, long electionEpoch, long peerEpoch, ServerState state) {
         this.id = id;
         this.zxid = zxid;
-        this.epoch = epoch;
+        this.electionEpoch = electionEpoch;
         this.state = state;
+        this.peerEpoch = peerEpoch;
     }
     
     public long id;
     
     public long zxid;
     
-    public long epoch = -1;
+    public long electionEpoch = -1;
+    
+    public long peerEpoch = -1;
     
     public ServerState state = ServerState.LOOKING;
     
@@ -54,7 +65,7 @@ public class Vote {
             return false;
         }
         Vote other = (Vote) o;
-        return (id == other.id && zxid == other.zxid && epoch == other.epoch);
+        return (id == other.id && zxid == other.zxid && electionEpoch == other.electionEpoch && peerEpoch == other.peerEpoch);
 
     }
 
@@ -64,6 +75,6 @@ public class Vote {
     }
 
     public String toString() {
-        return "(" + id + ", " + Long.toHexString(zxid) + ")";
+        return "(" + id + ", " + Long.toHexString(zxid) + ", " + Long.toHexString(peerEpoch) + ")";
     }
 }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java?rev=1135382&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java Tue Jun 14 05:14:05 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+public class ZxidUtils {
+	static public long getEpochFromZxid(long zxid) {
+		return zxid >> 32L;
+	}
+	static public long getCounterFromZxid(long zxid) {
+		return zxid & 0xffffffffL;
+	}
+	static public long makeZxid(long epoch, long counter) {
+		return (epoch << 32L) | (counter & 0xffffffffL);
+	}
+	static public String zxidToString(long zxid) {
+		return Long.toHexString(zxid);
+	}
+}

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Tue Jun 14 05:14:05 2011
@@ -26,6 +26,7 @@ import java.io.StringReader;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+import java.util.Map;
 import java.util.regex.Pattern;
 
 import org.apache.log4j.Layout;
@@ -33,9 +34,15 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.WriterAppender;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Assert;
 import org.junit.Test;
@@ -46,7 +53,7 @@ import org.junit.Test;
  *
  */
 public class QuorumPeerMainTest extends QuorumPeerTestBase {
-    /**
+	/**
      * Verify the ability to start a cluster.
      */
     @Test
@@ -103,6 +110,129 @@ public class QuorumPeerMainTest extends 
     }
 
     /**
+     * Test early leader abandonment.
+     */
+    @Test
+    public void testEarlyLeaderAbandonment() throws Exception {
+        ClientBase.setupTestEnv();
+
+        final int SERVER_COUNT = 3;
+        final int clientPorts[] = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	clientPorts[i] = PortAssignment.unique();
+        	sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
+        }
+        String quorumCfgSection = sb.toString();
+
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
+        	mt[i].start();
+        	zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+        }
+        
+        waitForAll(zk, States.CONNECTED);
+        
+        // we need to shutdown and start back up to make sure that the create session isn't the first transaction since
+        // that is rather innocuous.
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	mt[i].shutdown();
+        }
+        
+        waitForAll(zk, States.CONNECTING);
+        
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	mt[i].start();
+        }
+        
+        waitForAll(zk, States.CONNECTED);
+        
+        // ok lets find the leader and kill everything else, we have a few
+        // seconds, so it should be plenty of time
+        int leader = -1;
+        Map<Long, Proposal> outstanding = null;
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	if (mt[i].main.quorumPeer.leader == null) {
+        		mt[i].shutdown();
+        	} else {
+        		leader = i;
+        		outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
+        	}
+        }
+        
+        try {
+        	zk[leader].create("/zk"+leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE,
+        			CreateMode.PERSISTENT);
+        	Assert.fail("create /zk" + leader + " should have failed");
+        } catch(KeeperException e) {}
+        
+        // just make sure that we actually did get it in process at the 
+        // leader
+        Assert.assertTrue(outstanding.size() == 1);
+        Assert.assertTrue(((Proposal)outstanding.values().iterator().next()).request.hdr.getType() == OpCode.create);
+        // make sure it has a chance to write it to disk
+        Thread.sleep(1000);
+        mt[leader].shutdown();
+        waitForAll(zk, States.CONNECTING);
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	if (i != leader) {
+        		mt[i].start();
+        	}
+        }
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	if (i != leader) {
+        		waitForOne(zk[i], States.CONNECTED);
+        		zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        	}
+        }
+        
+        mt[leader].start();
+        waitForAll(zk, States.CONNECTED);
+        // make sure everything is consistent
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	for(int j = 0; j < SERVER_COUNT; j++) {
+        		if (i == leader) {
+         			Assert.assertTrue((j==leader?("Leader ("+leader+")"):("Follower "+j))+" should not have /zk" + i, zk[j].exists("/zk"+i, false) == null);
+        		} else {
+         			Assert.assertTrue((j==leader?("Leader ("+leader+")"):("Follower "+j))+" does not have /zk" + i, zk[j].exists("/zk"+i, false) != null);
+        		}
+        	}
+        }
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	zk[i].close();
+        }
+        for(int i = 0; i < SERVER_COUNT; i++) {
+        	mt[i].shutdown();
+        }
+    }
+
+    private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
+    	while(zk.getState() != state) {
+    		Thread.sleep(500);
+    	}
+    }
+    
+	private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
+		int iterations = 10;
+		boolean someoneNotConnected = true;
+        while(someoneNotConnected) {
+        	if (iterations-- == 0) {
+        		throw new RuntimeException("Waiting too long");
+        	}
+        	
+        	someoneNotConnected = false;
+        	for(ZooKeeper zk: zks) {
+        		if (zk.getState() != state) {
+        			someoneNotConnected = true;
+        		}
+        	}
+        	Thread.sleep(1000);
+        }
+	}
+
+    /**
      * Verify handling of bad quorum address
      */
     @Test

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Tue Jun 14 05:14:05 2011
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.QuorumBase;
 
@@ -54,20 +56,18 @@ public class QuorumPeerTestBase extends 
         }
     }
 
-    public static class MainThread extends Thread {
+    public static class MainThread implements Runnable {
         final File confFile;
-        final TestQPMain main;
+        volatile TestQPMain main;
 
         public MainThread(int myid, int clientPort, String quorumCfgSection)
             throws IOException
         {
-            super("QuorumPeer with myid:" + myid
-                    + " and clientPort:" + clientPort);
             File tmpDir = ClientBase.createTmpDir();
             confFile = new File(tmpDir, "zoo.cfg");
 
             FileWriter fwriter = new FileWriter(confFile);
-            fwriter.write("tickTime=2000\n");
+            fwriter.write("tickTime=4000\n");
             fwriter.write("initLimit=10\n");
             fwriter.write("syncLimit=5\n");
 
@@ -94,10 +94,14 @@ public class QuorumPeerTestBase extends 
             fwriter.write(Integer.toString(myid));
             fwriter.flush();
             fwriter.close();
-
-            main = new TestQPMain();
         }
 
+        Thread currentThread;
+        synchronized public void start() {
+        	main = new TestQPMain();
+        	currentThread = new Thread(this);
+        	currentThread.start();
+        }
         public void run() {
             String args[] = new String[1];
             args[0] = confFile.toString();
@@ -106,11 +110,27 @@ public class QuorumPeerTestBase extends 
             } catch (Exception e) {
                 // test will still fail even though we just log/ignore
                 LOG.error("unexpected exception in run", e);
+            } finally {
+            	currentThread = null;
             }
         }
 
-        public void shutdown() {
-            main.shutdown();
+        public void shutdown() throws InterruptedException {
+        	Thread t = currentThread;
+        	if (t != null && t.isAlive()) {
+        		main.shutdown();
+        		t.join(500);
+        	}
         }
+		public void join(long timeout) throws InterruptedException {
+			Thread t = currentThread;
+			if (t != null) {
+				t.join(timeout);
+			}
+		}
+		public boolean isAlive() {
+			Thread t = currentThread;
+			return t != null && t.isAlive();
+		}
     }
 }

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Jun 14 05:14:05 2011
@@ -448,7 +448,7 @@ public abstract class ClientBase extends
 
     protected void tearDownAll() throws Exception {
         synchronized (this) {
-            for (ZooKeeper zk : allClients) {
+            if (allClients != null) for (ZooKeeper zk : allClients) {
                 try {
                     if (zk != null)
                         zk.close();

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java Tue Jun 14 05:14:05 2011
@@ -199,6 +199,7 @@ public class ReadOnlyModeTest extends Qu
             watcher.reset();
             qu.start(2);
             qu.start(3);
+            ClientBase.waitForServerUp(qu.getConnString(), 2000);
             watcher.waitForConnected(CONNECTION_TIMEOUT);
             zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);

Modified: zookeeper/trunk/src/zookeeper.jute
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/zookeeper.jute?rev=1135382&r1=1135381&r2=1135382&view=diff
==============================================================================
--- zookeeper/trunk/src/zookeeper.jute (original)
+++ zookeeper/trunk/src/zookeeper.jute Tue Jun 14 05:14:05 2011
@@ -196,6 +196,10 @@ module org.apache.zookeeper.proto {
 }
 
 module org.apache.zookeeper.server.quorum {
+    class LearnerInfo {
+        long serverid;
+        int protocolVersion;
+    }
     class QuorumPacket {
         int type; // Request, Ack, Commit, Ping
         long zxid;



Mime
View raw message