zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mic...@apache.org
Subject svn commit: r1608648 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Tue, 08 Jul 2014 03:25:57 GMT
Author: michim
Date: Tue Jul  8 03:25:56 2014
New Revision: 1608648

URL: http://svn.apache.org/r1608648
Log:
ZOOKEEPER-1810. Add version to FLE notifications for trunk Germán Blanco via michim)

Added:
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
Removed:
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEBackwardElectionRoundTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETestUtils.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jul  8 03:25:56 2014
@@ -683,6 +683,9 @@ BUGFIXES:
   ZOOKEEPER-1835. dynamic configuration file renaming fails on Windows
   (Bruno Freudensprung via rakeshr)
 
+  ZOOKEEPER-1810. Add version to FLE notifications for trunk Germán Blanco via
+  michim)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
Tue Jul  8 03:25:56 2014
@@ -95,6 +95,13 @@ public class FastLeaderElection implemen
 
     static public class Notification {
         /*
+         * Format version, introduced in 3.4.6
+         */
+
+        public final static int CURRENTVERSION = 0x2;
+        int version;
+
+        /*
          * Proposed leader
          */
         long leader;
@@ -125,9 +132,9 @@ public class FastLeaderElection implemen
          */
         long peerEpoch;
     }
-    
+
     static byte[] dummyData = new byte[0];
-    
+
     /**
      * Messages that a peer wants to send to other peers.
      * These messages can be both Notifications and Acks
@@ -142,16 +149,15 @@ public class FastLeaderElection implemen
                 long electionEpoch,
                 ServerState state,
                 long sid,
-                long peerEpoch,                
+                long peerEpoch,
                 byte[] configData) {
 
-
             this.leader = leader;
             this.zxid = zxid;
             this.electionEpoch = electionEpoch;
             this.state = state;
             this.sid = sid;
-            this.peerEpoch = peerEpoch;            
+            this.peerEpoch = peerEpoch;
             this.configData = configData;
         }
 
@@ -184,7 +190,7 @@ public class FastLeaderElection implemen
          * Used to send a QuorumVerifier (configuration info)
          */
         byte[] configData = dummyData;
-        
+
         /*
          * Leader epoch
          */
@@ -201,7 +207,7 @@ public class FastLeaderElection implemen
      * spawns a new thread.
      */
 
-    private class Messenger {
+    protected class Messenger {
 
         /**
          * Receives messages from instance of QuorumCnxManager on
@@ -223,78 +229,91 @@ public class FastLeaderElection implemen
                 Message response;
                 while (!stop) {
                     // Sleeps on receive
-                    try{
+                    try {
                         response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                         if(response == null) continue;
-                        
+
                         // The current protocol and two previous generations all send at
least 28 bytes
                         if (response.buffer.capacity() < 28) {
                             LOG.error("Got a short response: " + response.buffer.capacity());
                             continue;
                         }
-                        
+
                         // this is the backwardCompatibility mode in place before ZK-107
                         // It is for a version of the protocol in which we didn't send peer
epoch
-                        // With peer epoch the message became 36 bytes
+                        // With peer epoch and version the message became 40 bytes
                         boolean backCompatibility28 = (response.buffer.capacity() == 28);
-                        
-                        // ZK-107 sends the configuration info in every message.
-                        // So messages are 36 bytes + size of configuration info 
-                        // (variable length, shoulld be at the end of the message).
-                        boolean backCompatibility36 = (response.buffer.capacity() == 36);
 
+                        // this is the backwardCompatibility mode for no version information
+                        boolean backCompatibility40 = (response.buffer.capacity() == 40);
+                        
                         response.buffer.clear();
+
+                        // Instantiate Notification and set its attributes
+                        Notification n = new Notification();
+
                         int rstate = response.buffer.getInt();
                         long rleader = response.buffer.getLong();
                         long rzxid = response.buffer.getLong();
                         long relectionEpoch = response.buffer.getLong();
                         long rpeerepoch;
-                        
-                        if(!backCompatibility28){
-                           rpeerepoch = response.buffer.getLong();
-                        } else {
-                            if(LOG.isInfoEnabled()){
-                                LOG.info("Backward compatibility mode (28 bits), server id:
" + response.sid);
+
+                        int version = 0x0;
+                        if (!backCompatibility28) {
+                            rpeerepoch = response.buffer.getLong();
+                            if (!backCompatibility40) {
+                                /*
+                                 * Version added in 3.4.6
+                                 */
+                                
+                                version = response.buffer.getInt();
+                            } else {
+                                LOG.info("Backward compatibility mode (36 bits), server id:
{}", response.sid);
                             }
+                        } else {
+                            LOG.info("Backward compatibility mode (28 bits), server id: {}",
response.sid);
                             rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
                         }
-                        
+
                         QuorumVerifier rqv = null;
-                        
-                        // check if we have more than 36 bytes. If so extract config info
from message.
-                        if(!backCompatibility28 && !backCompatibility36){
-                           byte b[] = new byte[response.buffer.remaining()];
-                           response.buffer.get(b);
+
+                        // check if we have a version that includes config. If so extract
config info from message.
+                        if (version > 0x1) {
+                            int configLength = response.buffer.getInt();
+                            byte b[] = new byte[configLength];
+
+                            response.buffer.get(b);
                                                        
-                           synchronized(self){                             
-                               try {
-                                   rqv = self.configFromString(new String(b));
-                                   QuorumVerifier curQV = self.getQuorumVerifier();
-                                   if (rqv.getVersion() > curQV.getVersion()) {
-                                       LOG.info(self.getId() + " Received version: " + Long.toHexString(rqv.getVersion())
+ " my version: " + Long.toHexString(self.getQuorumVerifier().getVersion()));
-                                       self.processReconfig(rqv, null, null, false);
-                                       if (!rqv.equals(curQV)) {
-                                           LOG.info("restarting leader election");
-                                           self.shuttingDownLE = true;
-                                           self.getElectionAlg().shutdown();
-                                           break;
+                            synchronized(self) {
+                                try {
+                                    rqv = self.configFromString(new String(b));
+                                    QuorumVerifier curQV = self.getQuorumVerifier();
+                                    if (rqv.getVersion() > curQV.getVersion()) {
+                                        LOG.info("{} Received version: {} my version: {}",
self.getId(),
+                                                Long.toHexString(rqv.getVersion()),
+                                                Long.toHexString(self.getQuorumVerifier().getVersion()));
+                                        self.processReconfig(rqv, null, null, false);
+                                        if (!rqv.equals(curQV)) {
+                                            LOG.info("restarting leader election");
+                                            self.shuttingDownLE = true;
+                                            self.getElectionAlg().shutdown();
+                                            
+                                            break;
                                        }
-                                   }           
-                               } catch (IOException e) {                         
-                                   LOG.error("Something went wrong while processing config
received from " + response.sid);
+                                    }
+                                } catch (IOException e) {
+                                    LOG.error("Something went wrong while processing config
received from {}", response.sid);
                                } catch (ConfigException e) {
-                                   LOG.error("Something went wrong while processing config
received from " + response.sid);
-                               } 
-                          }                                                       
+                                   LOG.error("Something went wrong while processing config
received from {}", response.sid);
+                               }
+                            }                          
                         } else {
-                            if(LOG.isInfoEnabled()){
-                                LOG.info("Backward compatibility mode (before reconfig),
server id: " + response.sid);
-                            }
+                            LOG.info("Backward compatibility mode (before reconfig), server
id: {}", response.sid);
                         }
                        
                         /*
-                         * If it is from a non-voting server (such as an observer or 
-                         * a non-voting follower), respond right away. 
+                         * If it is from a non-voting server (such as an observer or
+                         * a non-voting follower), respond right away.
                          */
                         if(!self.getVotingView().containsKey(response.sid)){
                             Vote current = self.getCurrentVote();
@@ -331,17 +350,18 @@ public class FastLeaderElection implemen
                             case 3:
                                 ackstate = QuorumPeer.ServerState.OBSERVING;
                                 break;
+                            default:
+                                continue;
                             }
 
-                            // Instantiate Notification and set its attributes
-                            Notification n = new Notification();
                             n.leader = rleader;
                             n.zxid = rzxid;
                             n.electionEpoch = relectionEpoch;
                             n.state = ackstate;
-                            n.sid = response.sid;                            
+                            n.sid = response.sid;
                             n.peerEpoch = rpeerepoch;
-                            n.qv = rqv;                              
+                            n.version = version;
+                            n.qv = rqv;
                             /*
                              * Print notification info
                              */
@@ -383,14 +403,14 @@ public class FastLeaderElection implemen
                                 Vote current = self.getCurrentVote();
                                 if(ackstate == QuorumPeer.ServerState.LOOKING){
                                     if(LOG.isDebugEnabled()){
-                                        LOG.debug("Sending new notification. My id =  " +
-                                                self.getId() + " recipient=" +
-                                                response.sid + " zxid=0x" +
-                                                Long.toHexString(current.getZxid()) +
-                                                " leader=" + current.getId() + " config version
= " + 
+                                        LOG.debug("Sending new notification. My id ={} recipient={}
zxid=0x{} leader={} config version = {}",
+                                                self.getId(),
+                                                response.sid,
+                                                Long.toHexString(current.getZxid()),
+                                                current.getId(),
                                                 Long.toHexString(self.getQuorumVerifier().getVersion()));
                                     }
-                                    
+
                                     QuorumVerifier qv = self.getQuorumVerifier();
                                     ToSend notmsg = new ToSend(
                                             ToSend.mType.notification,
@@ -399,7 +419,7 @@ public class FastLeaderElection implemen
                                             current.getElectionEpoch(),
                                             self.getPeerState(),
                                             response.sid,
-                                            current.getPeerEpoch(), 
+                                            current.getPeerEpoch(),
                                             qv.toString().getBytes());
                                     sendqueue.offer(notmsg);
                                 }
@@ -414,9 +434,6 @@ public class FastLeaderElection implemen
             }
         }
 
-    
-  
-
         /**
          * This worker simply dequeues a message to send and
          * and queues it on the manager's queue.
@@ -451,21 +468,13 @@ public class FastLeaderElection implemen
              *
              * @param m     message to send
              */
-            private void process(ToSend m) {
-                byte requestBytes[] = new byte[36 + m.configData.length];
-                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
-                /*
-                 * Building notification packet to send
-                 */
-
-                requestBuffer.clear();
-                requestBuffer.putInt(m.state.ordinal());
-                requestBuffer.putLong(m.leader);
-                requestBuffer.putLong(m.zxid);
-                requestBuffer.putLong(m.electionEpoch);
-                requestBuffer.putLong(m.peerEpoch);
-                requestBuffer.put(m.configData);
+            void process(ToSend m) {
+                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
+                                                    m.leader,
+                                                    m.zxid,
+                                                    m.electionEpoch,
+                                                    m.peerEpoch,
+                                                    m.configData);
 
                 manager.toSend(m.sid, requestBuffer);
 
@@ -474,6 +483,8 @@ public class FastLeaderElection implemen
 
         WorkerSender ws;
         WorkerReceiver wr;
+        Thread wsThread = null;
+        Thread wrThread = null;
 
         /**
          * Constructor of class Messenger.
@@ -484,17 +495,23 @@ public class FastLeaderElection implemen
 
             this.ws = new WorkerSender(manager);
 
-            Thread t = new Thread(this.ws,
+            this.wsThread = new Thread(this.ws,
                     "WorkerSender[myid=" + self.getId() + "]");
-            t.setDaemon(true);
-            t.start();
+            this.wsThread.setDaemon(true);
 
             this.wr = new WorkerReceiver(manager);
 
-            t = new Thread(this.wr,
+            this.wrThread = new Thread(this.wr,
                     "WorkerReceiver[myid=" + self.getId() + "]");
-            t.setDaemon(true);
-            t.start();
+            this.wrThread.setDaemon(true);
+        }
+
+        /**
+         * Starts instances of WorkerSender and WorkerReceiver
+         */
+        void start(){
+            this.wsThread.start();
+            this.wrThread.start();
         }
 
         /**
@@ -522,6 +539,55 @@ public class FastLeaderElection implemen
         return logicalclock;
     }
 
+    static ByteBuffer buildMsg(int state,
+            long leader,
+            long zxid,
+            long electionEpoch,
+            long epoch) {
+        byte requestBytes[] = new byte[40];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+        /*
+         * Building notification packet to send, this is called directly only in tests
+         */
+
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(electionEpoch);
+        requestBuffer.putLong(epoch);
+        requestBuffer.putInt(0x1);
+
+        return requestBuffer;
+    }
+
+    static ByteBuffer buildMsg(int state,
+            long leader,
+            long zxid,
+            long electionEpoch,
+            long epoch,
+            byte[] configData) {
+        byte requestBytes[] = new byte[44 + configData.length];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+        /*
+         * Building notification packet to send
+         */
+
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(electionEpoch);
+        requestBuffer.putLong(epoch);
+        requestBuffer.putInt(Notification.CURRENTVERSION);
+        requestBuffer.putInt(configData.length);
+        requestBuffer.put(configData);
+
+        return requestBuffer;
+    }
+
     /**
      * Constructor of FastLeaderElection. It takes two parameters, one
      * is the QuorumPeer object that instantiated this object, and the other
@@ -557,12 +623,17 @@ public class FastLeaderElection implemen
         this.messenger = new Messenger(manager);
     }
 
+    /**
+     * This method starts the sender and receiver threads.
+     */
+    public void start() {
+        this.messenger.start();
+    }
+
     private void leaveInstance(Vote v) {
         if(LOG.isDebugEnabled()){
-            LOG.debug("About to leave FLE instance: leader="
-                + v.getId() + ", zxid=0x" +
-                Long.toHexString(v.getZxid()) + ", my id=" + self.getId()
-                + ", my state=" + self.getPeerState());
+            LOG.debug("About to leave FLE instance: leader={}, zxid=0x{}, my id={}, my state={}",
+                v.getId(), Long.toHexString(v.getZxid()), self.getId(), self.getPeerState());
         }
         recvqueue.clear();
     }
@@ -582,7 +653,6 @@ public class FastLeaderElection implemen
         messenger.halt();
         LOG.debug("FLE is down");
     }
-    
 
     /**
      * Send notifications to all peers upon a change in our vote
@@ -608,14 +678,17 @@ public class FastLeaderElection implemen
     }
 
     private void printNotification(Notification n){
-        LOG.info("Notification: " + n.leader + " (n.leader), 0x"
+        LOG.info("Notification: "
+                + Long.toHexString(n.version) + " (message format version), "
+                + n.leader + " (n.leader), 0x"
                 + Long.toHexString(n.zxid) + " (n.zxid), 0x"
                 + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
                 + " (n.state), " + n.sid + " (n.sid), 0x"
                 + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
-                + self.getPeerState() + " (my state)" + (n.qv!=null ? (Long.toHexString(n.qv.getVersion())
+ " (n.config version)"):""));
+                + self.getPeerState() + " (my state)"
+                + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):""));
     }
- 
+
 
     /**
      * Check if a pair (server id, zxid) succeeds our
@@ -630,7 +703,7 @@ public class FastLeaderElection implemen
         if(self.getQuorumVerifier().getWeight(newId) == 0){
             return false;
         }
-        
+
         /*
          * We return true if one of the following three cases hold:
          * 1- New epoch is higher
@@ -638,8 +711,8 @@ public class FastLeaderElection implemen
          * 3- New epoch is the same as current epoch, new zxid is the same
          *  as current zxid, but server id is higher.
          */
-        
-        return ((newEpoch > curEpoch) || 
+
+        return ((newEpoch > curEpoch) ||
                 ((newEpoch == curEpoch) &&
                 ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
     }
@@ -700,7 +773,7 @@ public class FastLeaderElection implemen
             else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
         } else if(logicalclock != electionEpoch) {
             predicate = false;
-        } 
+        }
 
         return predicate;
     }
@@ -776,7 +849,7 @@ public class FastLeaderElection implemen
         	}
         else return Long.MIN_VALUE;
     }
-    
+
     /**
      * Starts a new round of leader election. Whenever our QuorumPeer
      * changes its state to LOOKING, this method is invoked, and it

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Tue Jul  8 03:25:56 2014
@@ -70,7 +70,7 @@ public class QuorumCnxManager {
     // stale notifications to peers
     static final int SEND_CAPACITY = 1;
 
-    static final int PACKETMAXSIZE = 1024 * 1024; 
+    static final int PACKETMAXSIZE = 1024 * 512;
     /*
      * Maximum number of attempts to connect to a peer
      */

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jul
 8 03:25:56 2014
@@ -810,7 +810,9 @@ public class QuorumPeer extends ZooKeepe
             QuorumCnxManager.Listener listener = qcm.listener;
             if(listener != null){
                 listener.start();
-                le = new FastLeaderElection(this, qcm);
+                FastLeaderElection fle = new FastLeaderElection(this, qcm);
+                fle.start();
+                le = fle;
             } else {
                 LOG.error("Null listener when initializing cnx manager");
             }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java Tue Jul  8
03:25:56 2014
@@ -23,7 +23,9 @@ import org.apache.zookeeper.server.quoru
 
 public class Vote {
     
-    public Vote(long id, long zxid) {
+    public Vote(long id,
+                    long zxid) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = -1;
@@ -31,7 +33,10 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
     
-    public Vote(long id, long zxid, long peerEpoch) {
+    public Vote(long id,
+                    long zxid,
+                    long peerEpoch) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = -1;
@@ -39,7 +44,11 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
 
-    public Vote(long id, long zxid, long electionEpoch, long peerEpoch) {
+    public Vote(long id,
+                    long zxid,
+                    long electionEpoch,
+                    long peerEpoch) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = electionEpoch;
@@ -47,7 +56,13 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
     
-    public Vote(long id, long zxid, long electionEpoch, long peerEpoch, ServerState state)
{
+    public Vote(int version,
+                    long id,
+                    long zxid,
+                    long electionEpoch,
+                    long peerEpoch,
+                    ServerState state) {
+        this.version = version;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = electionEpoch;
@@ -55,6 +70,21 @@ public class Vote {
         this.peerEpoch = peerEpoch;
     }
     
+    public Vote(long id,
+                    long zxid,
+                    long electionEpoch,
+                    long peerEpoch,
+                    ServerState state) {
+        this.id = id;
+        this.zxid = zxid;
+        this.electionEpoch = electionEpoch;
+        this.state = state;
+        this.peerEpoch = peerEpoch;
+        this.version = 0x0;
+    }
+
+    final private int version;
+
     final private long id;
     
     final private long zxid;
@@ -63,6 +93,10 @@ public class Vote {
     
     final private long peerEpoch;
     
+    public int getVersion() {
+        return version;
+    }
+
     public long getId() {
         return id;
     }
@@ -91,7 +125,10 @@ public class Vote {
             return false;
         }
         Vote other = (Vote) o;
-        return (id == other.id && zxid == other.zxid && electionEpoch ==
other.electionEpoch && peerEpoch == other.peerEpoch);
+        return (id == other.id
+                    && zxid == other.zxid
+                    && electionEpoch == other.electionEpoch
+                    && peerEpoch == other.peerEpoch);
 
     }
 

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java?rev=1608648&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
(added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
Tue Jul  8 03:25:56 2014
@@ -0,0 +1,150 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLEBackwardElectionRoundTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File tmpdir[];
+    int port[];
+
+    QuorumCnxManager cnxManagers[];
+
+    @Before
+    public void setUp() throws Exception {
+        count = 3;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+        cnxManagers = new QuorumCnxManager[count - 1];
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for(int i = 0; i < (count - 1); i++){
+            if(cnxManagers[i] != null){
+                cnxManagers[i].halt();
+            }
+        }
+    }
+
+    /**
+     * This test is checking the following case. A server S is
+     * currently LOOKING and it receives notifications from
+     * a quorum indicating they are following S. The election
+     * round E of S is higher than the election round E' in the
+     * notification messages, so S becomes the leader and sets
+     * its epoch back to E'. In the meanwhile, one or more
+     * followers turn to LOOKING and elect S in election round E.
+     * Having leader and followers with different election rounds
+     * might prevent other servers from electing a leader because
+     * they can't get a consistent set of notifications from a
+     * quorum.
+     *
+     * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1514}
+     *
+     *
+     * @throws Exception
+     */
+
+    @Test
+    public void testBackwardElectionRound() throws Exception {
+        LOG.info("TestLE: {}, {}", getTestName(), count);
+        for(int i = 0; i < count; i++) {
+            int clientport = PortAssignment.unique();
+            peers.put(Long.valueOf(i),
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                            new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = clientport;
+        }
+
+        ByteBuffer initialMsg = FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0,
0, 1);
+
+        /*
+         * Start server 0
+         */
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000,
2, 2);
+        peer.startLeaderElection();
+        FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
+        thread.start();
+
+        /*
+         * Start mock server 1
+         */
+        QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1,
1000, 2, 2);
+        cnxManagers[0] = new QuorumCnxManager(mockPeer);
+        cnxManagers[0].listener.start();
+
+        cnxManagers[0].toSend(0l, initialMsg);
+
+        /*
+         * Start mock server 2
+         */
+        mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
+        cnxManagers[1] = new QuorumCnxManager(mockPeer);
+        cnxManagers[1].listener.start();
+
+        cnxManagers[1].toSend(0l, initialMsg);
+
+        /*
+         * Run another instance of leader election.
+         */
+        thread.join(5000);
+        thread = new FLETestUtils.LEThread(peer, 0);
+        thread.start();
+
+        /*
+         * Send the same messages, this time should not make 0 the leader.
+         */
+        cnxManagers[0].toSend(0l, initialMsg);
+        cnxManagers[1].toSend(0l, initialMsg);
+
+        thread.join(5000);
+
+        if (!thread.isAlive()) {
+            Assert.fail("Should not have joined");
+        }
+
+    }
+}

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java?rev=1608648&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
(added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
Tue Jul  8 03:25:56 2014
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLELostMessageTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File tmpdir[];
+    int port[];
+
+    QuorumCnxManager cnxManager;
+
+    @Before
+    public void setUp() throws Exception {
+        count = 3;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cnxManager.halt();
+    }
+
+    @Test
+    public void testLostMessage() throws Exception {
+        LOG.info("TestLE: {}, {}", getTestName(), count);
+        for(int i = 0; i < count; i++) {
+            int clientport = PortAssignment.unique();
+            peers.put(Long.valueOf(i),
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                            new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = clientport;
+        }
+
+        /*
+         * Start server 0
+         */
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000,
2, 2);
+        peer.startLeaderElection();
+        FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
+        thread.start();
+
+        /*
+         * Start mock server 1
+         */
+        mockServer();
+        thread.join(5000);
+        if (thread.isAlive()) {
+            Assert.fail("Threads didn't join");
+        }
+    }
+
+    void mockServer() throws InterruptedException, IOException {
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000,
2, 2);
+        cnxManager = new QuorumCnxManager(peer);
+        cnxManager.listener.start();
+
+        cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0,
0));
+        cnxManager.recvQueue.take();
+        cnxManager.toSend(1L, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 1,
0, 0));
+    }
+}

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java?rev=1608648&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java Tue
Jul  8 03:25:56 2014
@@ -0,0 +1,84 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.nio.ByteBuffer;
+
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Assert;
+
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+
+public class FLETestUtils {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLETestUtils.class);
+
+    /*
+     * Thread to run an instance of leader election for 
+     * a given quorum peer.
+     */
+    static class LEThread extends Thread {
+        private int i;
+        private QuorumPeer peer;
+
+        LEThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: {}", getName());
+
+        }
+
+        public void run() {
+            try {
+                Vote v = null;
+                peer.setPeerState(ServerState.LOOKING);
+                LOG.info("Going to call leader election: {}", i);
+                v = peer.getElectionAlg().lookForLeader();
+
+                if (v == null) {
+                    Assert.fail("Thread " + i + " got a null vote");
+                }
+
+                /*
+                 * A real zookeeper would take care of setting the current vote. Here
+                 * we do it manually.
+                 */
+                peer.setCurrentVote(v);
+
+                LOG.info("Finished election: {}, {}", i, v.getId());
+
+                Assert.assertTrue("State is not leading.", peer.getPeerState() == ServerState.LEADING);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            LOG.info("Joining");
+        }
+    }
+
+    /*
+     * Creates a leader election notification message.
+     */
+    static ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
+        return FastLeaderElection.buildMsg(state, leader, zxid, 1, epoch);
+    }
+
+}

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Tue Jul 
8 03:25:56 2014
@@ -152,8 +152,6 @@ public class FLENewEpochTest extends ZKT
       @Test
       public void testLENewEpoch() throws Exception {
 
-          FastLeaderElection le[] = new FastLeaderElection[count];
-
           LOG.info("TestLE: " + getTestName()+ ", " + count);
           for(int i = 0; i < count; i++) {
               peers.put(Long.valueOf(i),
@@ -166,7 +164,7 @@ public class FLENewEpochTest extends ZKT
               port[i] = PortAssignment.unique();
           }
 
-          for(int i = 1; i < le.length; i++) {
+          for(int i = 1; i < count; i++) {
               QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i,
1000, 2, 2);
               peer.startLeaderElection();
               LEThread thread = new LEThread(peer, i);

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java Tue Jul
 8 03:25:56 2014
@@ -78,6 +78,7 @@ public class FLEPredicateTest extends ZK
                                         PortAssignment.unique(), 3, 0, 1000, 2, 2);
         
             MockFLE mock = new MockFLE(peer);
+            mock.start();
             
             /*
              * Lower epoch must return false

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Tue Jul  8 03:25:56
2014
@@ -297,7 +297,6 @@ public class FLETest extends ZKTestCase 
      * @throws Exception
      */
     private void runElection(int rounds) throws Exception {
-        FastLeaderElection le[] = new FastLeaderElection[count];
         ConcurrentHashMap<Long, HashSet<Integer> > quora = 
             new ConcurrentHashMap<Long, HashSet<Integer> >();
 
@@ -322,7 +321,7 @@ public class FLETest extends ZKTestCase 
         /*
          * Start one LEThread for each peer we want to run.
          */
-        for(int i = 0; i < le.length; i++) {
+        for(int i = 0; i < count; i++) {
             QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i],
                     port[i], 3, i, 1000, 2, 2);
             peer.startLeaderElection();

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java Tue Jul
 8 03:25:56 2014
@@ -140,8 +140,6 @@ public class FLEZeroWeightTest extends Z
 
     @Test
     public void testZeroWeightQuorum() throws Exception {
-        FastLeaderElection le[] = new FastLeaderElection[count];
-
         LOG.info("TestZeroWeightQuorum: " + getTestName()+ ", " + count);
         for(int i = 0; i < count; i++) {
             InetSocketAddress addr1 = new InetSocketAddress("127.0.0.1",PortAssignment.unique());
@@ -153,7 +151,7 @@ public class FLEZeroWeightTest extends Z
             tmpdir[i] = ClientBase.createTmpDir();
         }
 
-        for(int i = 0; i < le.length; i++) {
+        for(int i = 0; i < count; i++) {
             QuorumHierarchical hq = new QuorumHierarchical(qp);
             QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i,
1000, 2, 2, hq);
             peer.startLeaderElection();

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java?rev=1608648&r1=1608647&r2=1608648&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java Tue Jul
 8 03:25:56 2014
@@ -37,6 +37,7 @@ import org.apache.zookeeper.PortAssignme
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.FLELostMessageTest;
 import org.apache.zookeeper.server.quorum.LeaderElection;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.Vote;



Mime
View raw message