hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ph...@apache.org
Subject svn commit: r960675 - in /hadoop/zookeeper/branches/branch-3.3: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
Date Mon, 05 Jul 2010 19:53:06 GMT
Author: phunt
Date: Mon Jul  5 19:53:05 2010
New Revision: 960675

URL: http://svn.apache.org/viewvc?rev=960675&view=rev
Log:
ZOOKEEPER-789. Improve FLE log messages

Modified:
    hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
    hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java

Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=960675&r1=960674&r2=960675&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Mon Jul  5 19:53:05 2010
@@ -1,3 +1,14 @@
+Branch 3.3
+
+Backward compatible changes:
+
+BUGFIXES:
+
+IMPROVEMENTS:
+
+  ZOOKEEPER-789. Improve FLE log messages (flavio via phunt)
+
+
 Release 3.3.1 - 2010-05-11
 Backward compatible changes:
 

Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=960675&r1=960674&r2=960675&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
(original)
+++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
Mon Jul  5 19:53:05 2010
@@ -35,11 +35,11 @@ import org.apache.zookeeper.server.quoru
 
 
 /**
- * Implementation of leader election using TCP. It uses an object of the class 
+ * Implementation of leader election using TCP. It uses an object of the class
  * QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based
- * as with the other UDP implementations. 
- * 
- * There are a few parameters that can be tuned to change its behavior. First, 
+ * as with the other UDP implementations.
+ *
+ * There are a few parameters that can be tuned to change its behavior. First,
  * finalizeWait determines the amount of time to wait until deciding upon a leader.
  * This is part of the leader election algorithm.
  */
@@ -56,30 +56,30 @@ public class FastLeaderElection implemen
     final static int finalizeWait = 200;
 
 
-	/**
-	 * Upper bound on the amount of time between two consecutive
-	 * notification checks. This impacts the amount of time to get
-	 * the system up again after long partitions. Currently 60 seconds. 
-	 */
-	
+    /**
+     * Upper bound on the amount of time between two consecutive
+     * notification checks. This impacts the amount of time to get
+     * the system up again after long partitions. Currently 60 seconds.
+     */
+
     final static int maxNotificationInterval = 60000;
-    
-	/**
-	 * Connection manager. Fast leader election uses TCP for 
-	 * communication between peers, and QuorumCnxManager manages
-	 * such connections. 
-	 */
-	
-	QuorumCnxManager manager;
-
-	
-	/**
-	 * Notifications are messages that let other peers know that
-	 * a given peer has changed its vote, either because it has
-	 * joined leader election or because it learned of another 
-	 * peer with higher zxid or same zxid and higher server id
-	 */
-	
+
+    /**
+     * Connection manager. Fast leader election uses TCP for
+     * communication between peers, and QuorumCnxManager manages
+     * such connections.
+     */
+
+    QuorumCnxManager manager;
+
+
+    /**
+     * Notifications are messages that let other peers know that
+     * a given peer has changed its vote, either because it has
+     * joined leader election or because it learned of another
+     * peer with higher zxid or same zxid and higher server id
+     */
+
     static public class Notification {
         /*
          * Proposed leader
@@ -100,7 +100,7 @@ public class FastLeaderElection implemen
          * current state of sender
          */
         QuorumPeer.ServerState state;
-        
+
         /*
          * Address of sender
          */
@@ -113,22 +113,22 @@ public class FastLeaderElection implemen
      * of reception of notification.
      */
     static public class ToSend {
-    	static enum mType {crequest, challenge, notification, ack}
-        
-        ToSend(mType type, 
-        		long leader, 
-        		long zxid, 
-        		long epoch, 
-        		ServerState state,
-        		long sid) {
-        
-        	this.leader = leader;
-        	this.zxid = zxid;
-        	this.epoch = epoch;
-        	this.state = state;
-        	this.sid = sid;
+        static enum mType {crequest, challenge, notification, ack}
+
+        ToSend(mType type,
+                long leader,
+                long zxid,
+                long epoch,
+                ServerState state,
+                long sid) {
+
+            this.leader = leader;
+            this.zxid = zxid;
+            this.epoch = epoch;
+            this.state = state;
+            this.sid = sid;
         }
-        
+
         /*
          * Proposed leader in the case of notification
          */
@@ -148,7 +148,7 @@ public class FastLeaderElection implemen
          * Current state;
          */
         QuorumPeer.ServerState state;
-        
+
         /*
          * Address of recipient
          */
@@ -164,17 +164,17 @@ public class FastLeaderElection implemen
      * functionality of each is obvious from the name. Each of these
      * spawns a new thread.
      */
-    
+
     private class Messenger {
-        
+
         /**
          * Receives messages from instance of QuorumCnxManager on
          * method run(), and processes such messages.
          */
-        
+
         class WorkerReceiver implements Runnable {
             volatile boolean stop;
-        	QuorumCnxManager manager;
+            QuorumCnxManager manager;
 
             WorkerReceiver(QuorumCnxManager manager) {
                 this.stop = false;
@@ -182,147 +182,154 @@ public class FastLeaderElection implemen
             }
 
             public void run() {
-                
-            	Message response;
-            	while (!stop) {
+
+                Message response;
+                while (!stop) {
                     // Sleeps on receive
-            		try{
-            			response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
-            			if(response == null) continue;
-
-            			/*
-            			 * If it is from an observer, respond right away.
-            			 * Note that the following predicate assumes that
-            			 * if a server is not a follower, then it must be
-            			 * an observer. If we ever have any other type of
-            			 * learner in the future, we'll have to change the
-            			 * way we check for observers.
-            			 */
-            			if(!self.getVotingView().containsKey(response.sid)){
-            			    Vote current = self.getCurrentVote();
-            			    ToSend notmsg = new ToSend(ToSend.mType.notification, 
-            			            current.id, 
+                    try{
+                        response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+                        if(response == null) continue;
+
+                        /*
+                         * If it is from an observer, respond right away.
+                         * Note that the following predicate assumes that
+                         * if a server is not a follower, then it must be
+                         * an observer. If we ever have any other type of
+                         * learner in the future, we'll have to change the
+                         * way we check for observers.
+                         */
+                        if(!self.getVotingView().containsKey(response.sid)){
+                            Vote current = self.getCurrentVote();
+                            ToSend notmsg = new ToSend(ToSend.mType.notification,
+                                    current.id,
                                     current.zxid,
-            	                    logicalclock,
-            	                    self.getPeerState(),
-            	                    response.sid);
-
-            	            sendqueue.offer(notmsg);
-            			} else {           			
-            			    // Receive new message
-            			    if (LOG.isDebugEnabled()) {
-            			        LOG.debug("Receive new notification message. My id = " 
-            			                + self.getId());
-            			    }
-            			    
-            			    if (response.buffer.capacity() < 28) {
-            			        LOG.error("Got a short response: "
-            			                + response.buffer.capacity());
-            			        continue;
-            			    }
-            			    response.buffer.clear();
-
-            			    // State of peer that sent this message
-            			    QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
-            			    switch (response.buffer.getInt()) {
-            			    case 0:
-            			        ackstate = QuorumPeer.ServerState.LOOKING;
-            			        break;
-            			    case 1:
-            			        ackstate = QuorumPeer.ServerState.FOLLOWING;
-            			        break;
-            			    case 2:
-            			        ackstate = QuorumPeer.ServerState.LEADING;
-            			        break;
-            			    case 3:
+                                    logicalclock,
+                                    self.getPeerState(),
+                                    response.sid);
+
+                            sendqueue.offer(notmsg);
+                        } else {
+                            // Receive new message
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Receive new notification message. My id = "
+                                        + self.getId());
+                            }
+
+                            if (response.buffer.capacity() < 28) {
+                                LOG.error("Got a short response: "
+                                        + response.buffer.capacity());
+                                continue;
+                            }
+                            response.buffer.clear();
+
+                            // State of peer that sent this message
+                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
+                            switch (response.buffer.getInt()) {
+                            case 0:
+                                ackstate = QuorumPeer.ServerState.LOOKING;
+                                break;
+                            case 1:
+                                ackstate = QuorumPeer.ServerState.FOLLOWING;
+                                break;
+                            case 2:
+                                ackstate = QuorumPeer.ServerState.LEADING;
+                                break;
+                            case 3:
                                 ackstate = QuorumPeer.ServerState.OBSERVING;
                                 break;
-            			    }
-            			    
-            			    // Instantiate Notification and set its attributes
-            			    Notification n = new Notification();
-            			    n.leader = response.buffer.getLong();
-            			    n.zxid = response.buffer.getLong();
-            			    n.epoch = response.buffer.getLong();
-            			    n.state = ackstate;
-            			    n.sid = response.sid;
-
-            			    /*
-            			     * If this server is looking, then send proposed leader
-            			     */
-
-            			    if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
-            			        recvqueue.offer(n);
-
-            			        /*
-            			         * Send a notification back if the peer that sent this
-            			         * message is also looking and its logical clock is 
-            			         * lagging behind.
-            			         */
-            			        if((ackstate == QuorumPeer.ServerState.LOOKING)
-            			                && (n.epoch < logicalclock)){
-            			            Vote v = getVote();
-            			            ToSend notmsg = new ToSend(ToSend.mType.notification, 
-            			                    v.id, 
-            			                    v.zxid,
-            			                    logicalclock,
-            			                    self.getPeerState(),
-            			                    response.sid);
-            			            sendqueue.offer(notmsg);
-            			        }
-            			    } else {
-            			        /*
-            			         * If this server is not looking, but the one that sent the ack
-            			         * is looking, then send back what it believes to be the leader.
-            			         */
-            			        Vote current = self.getCurrentVote();
-            			        if(ackstate == QuorumPeer.ServerState.LOOKING){
-            			            if(LOG.isDebugEnabled()){
-            			                LOG.debug("Sending new notification. My id =  " +
-            			                        self.getId() + ", Recipient = " +
-            			                        response.sid);
-            			            }
-            			            ToSend notmsg = new ToSend(
-            			                    ToSend.mType.notification, 
-            			                    current.id, 
-            			                    current.zxid,
-            			                    logicalclock,
-            			                    self.getPeerState(),
-            			                    response.sid);
-            			            sendqueue.offer(notmsg);
-            			        }
-            			    }
-            			}
-            		} catch (InterruptedException e) {
-            			System.out.println("Interrupted Exception while waiting for new message" +
-            					e.toString());
-            		}
-            	}
+                            }
+
+                            // Instantiate Notification and set its attributes
+                            Notification n = new Notification();
+                            n.leader = response.buffer.getLong();
+                            n.zxid = response.buffer.getLong();
+                            n.epoch = response.buffer.getLong();
+                            n.state = ackstate;
+                            n.sid = response.sid;
+
+                            /*
+                             * Print notification info
+                             */
+                            if(LOG.isInfoEnabled()){
+                                printNotification(n);
+                            }
+
+                            /*
+                             * If this server is looking, then send proposed leader
+                             */
+
+                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
+                                recvqueue.offer(n);
+
+                                /*
+                                 * Send a notification back if the peer that sent this
+                                 * message is also looking and its logical clock is
+                                 * lagging behind.
+                                 */
+                                if((ackstate == QuorumPeer.ServerState.LOOKING)
+                                        && (n.epoch < logicalclock)){
+                                    Vote v = getVote();
+                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
+                                            v.id,
+                                            v.zxid,
+                                            logicalclock,
+                                            self.getPeerState(),
+                                            response.sid);
+                                    sendqueue.offer(notmsg);
+                                }
+                            } else {
+                                /*
+                                 * If this server is not looking, but the one that sent the
ack
+                                 * is looking, then send back what it believes to be the
leader.
+                                 */
+                                Vote current = self.getCurrentVote();
+                                if(ackstate == QuorumPeer.ServerState.LOOKING){
+                                    if(LOG.isDebugEnabled()){
+                                        LOG.debug("Sending new notification. My id =  " +
+                                                self.getId() + ", Recipient = " +
+                                                response.sid);
+                                    }
+                                    ToSend notmsg = new ToSend(
+                                            ToSend.mType.notification,
+                                            current.id,
+                                            current.zxid,
+                                            logicalclock,
+                                            self.getPeerState(),
+                                            response.sid);
+                                    sendqueue.offer(notmsg);
+                                }
+                            }
+                        }
+                    } catch (InterruptedException e) {
+                        System.out.println("Interrupted Exception while waiting for new message"
+
+                                e.toString());
+                    }
+                }
                 LOG.info("WorkerReceiver is down");
             }
         }
 
-        
+
         /**
          * This worker simply dequeues a message to send and
-         * and queues it on the manager's queue. 
+         * and queues it on the manager's queue.
          */
-        
+
         class WorkerSender implements Runnable {
-        	volatile boolean stop;
+            volatile boolean stop;
             QuorumCnxManager manager;
 
-            WorkerSender(QuorumCnxManager manager){ 
+            WorkerSender(QuorumCnxManager manager){
                 this.stop = false;
                 this.manager = manager;
             }
-            
+
             public void run() {
                 while (!stop) {
                     try {
                         ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                         if(m == null) continue;
-                        
+
                         process(m);
                     } catch (InterruptedException e) {
                         break;
@@ -333,25 +340,25 @@ public class FastLeaderElection implemen
 
             /**
              * Called by run() once there is a new message to send.
-             * 
+             *
              * @param m     message to send
              */
             private void process(ToSend m) {
                 byte requestBytes[] = new byte[28];
-                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);  
-                
+                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
                 /*
                  * Building notification packet to send
                  */
-                    
+
                 requestBuffer.clear();
                 requestBuffer.putInt(m.state.ordinal());
                 requestBuffer.putLong(m.leader);
                 requestBuffer.putLong(m.zxid);
                 requestBuffer.putLong(m.epoch);
-                
+
                 manager.toSend(m.sid, requestBuffer);
-                  
+
             }
         }
 
@@ -362,32 +369,32 @@ public class FastLeaderElection implemen
             return (sendqueue.isEmpty() || recvqueue.isEmpty());
         }
 
-        
+
         WorkerSender ws;
         WorkerReceiver wr;
-        
+
         /**
          * Constructor of class Messenger.
-         * 
+         *
          * @param manager   Connection manager
          */
         Messenger(QuorumCnxManager manager) {
 
             this.ws = new WorkerSender(manager);
-            
+
             Thread t = new Thread(this.ws,
-            		"WorkerSender Thread");
+                    "WorkerSender Thread");
             t.setDaemon(true);
             t.start();
 
             this.wr = new WorkerReceiver(manager);
-        
+
             t = new Thread(this.wr,
-                    				"WorkerReceiver Thread");
+                                    "WorkerReceiver Thread");
             t.setDaemon(true);
             t.start();
         }
-        
+
         /**
          * Stops instances of WorkerSender and WorkerReceiver
          */
@@ -398,7 +405,7 @@ public class FastLeaderElection implemen
 
     }
 
-    QuorumPeer self;    
+    QuorumPeer self;
     Messenger messenger;
     volatile long logicalclock; /* Election instance */
     long proposedLeader;
@@ -409,33 +416,33 @@ public class FastLeaderElection implemen
      * Returns the current vlue of the logical clock counter
      */
     public long getLogicalClock(){
-	return logicalclock;
+    return logicalclock;
     }
-    
+
     /**
      * Constructor of FastLeaderElection. It takes two parameters, one
      * is the QuorumPeer object that instantiated this object, and the other
-     * is the connection manager. Such an object should be created only once 
+     * is the connection manager. Such an object should be created only once
      * by each peer during an instance of the ZooKeeper service.
-     * 
+     *
      * @param self  QuorumPeer that created this object
      * @param manager   Connection manager
      */
     public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
-    	this.stop = false;
+        this.stop = false;
         this.manager = manager;
-    	starter(self, manager);
+        starter(self, manager);
     }
-    
+
     /**
      * This method is invoked by the constructor. Because it is a
      * part of the starting procedure of the object that must be on
      * any constructor of this class, it is probably best to keep as
-     * a separate method. As we have a single constructor currently, 
+     * a separate method. As we have a single constructor currently,
      * it is not strictly necessary to have it separate.
-     * 
+     *
      * @param self      QuorumPeer that created this object
-     * @param manager   Connection manager   
+     * @param manager   Connection manager
      */
     private void starter(QuorumPeer self, QuorumCnxManager manager) {
         this.self = self;
@@ -450,11 +457,11 @@ public class FastLeaderElection implemen
     private void leaveInstance() {
         recvqueue.clear();
     }
-    
+
     public QuorumCnxManager getCnxManager(){
-    	return manager;
+        return manager;
     }
-    
+
     volatile boolean stop;
     public void shutdown(){
         stop = true;
@@ -465,7 +472,7 @@ public class FastLeaderElection implemen
         LOG.debug("FLE is down");
     }
 
-    
+
     /**
      * Send notifications to all peers upon a change in our vote
      */
@@ -473,9 +480,9 @@ public class FastLeaderElection implemen
         for (QuorumServer server : self.getVotingView().values()) {
             long sid = server.id;
 
-            ToSend notmsg = new ToSend(ToSend.mType.notification, 
-            		proposedLeader, 
-            		proposedZxid,
+            ToSend notmsg = new ToSend(ToSend.mType.notification,
+                    proposedLeader,
+                    proposedZxid,
                     logicalclock,
                     QuorumPeer.ServerState.LOOKING,
                     sid);
@@ -484,10 +491,18 @@ public class FastLeaderElection implemen
         }
     }
 
+
+    private void printNotification(Notification n){
+        LOG.info("Notification: " + n.leader + " (n.leader), " + n.zxid +
+                " (n.zxid), " + n.epoch + " (n.round), " + n.state +
+                " (n.state), " + n.sid + " (n.sid), " + self.getPeerState() +
+                " (my state)");
+    }
+
     /**
      * Check if a pair (server id, zxid) succeeds our
      * current vote.
-     * 
+     *
      * @param id    Server identifier
      * @param zxid  Last zxid observed by the issuer of this vote
      */
@@ -496,7 +511,7 @@ public class FastLeaderElection implemen
         if(self.getQuorumVerifier().getWeight(newId) == 0){
             return false;
         }
-        
+
         if ((newZxid > curZxid)
                 || ((newZxid == curZxid) && (newId > curId)))
             return true;
@@ -508,17 +523,17 @@ public class FastLeaderElection implemen
     /**
      * Termination predicate. Given a set of votes, determines if
      * have sufficient to declare the end of the election round.
-     * 
+     *
      *  @param votes    Set of votes
      *  @param l        Identifier of the vote received last
      *  @param zxid     zxid of the the vote received last
      */
     private boolean termPredicate(
-            HashMap<Long, Vote> votes, 
+            HashMap<Long, Vote> votes,
             Vote vote) {
 
         HashSet<Long> set = new HashSet<Long>();
-        
+
         /*
          * First make the views consistent. Sometimes peers will have
          * different zxids for a server depending on timing.
@@ -528,7 +543,7 @@ public class FastLeaderElection implemen
                 set.add(entry.getKey());
             }
         }
-                      
+
         if(self.getQuorumVerifier().containsQuorum(set))
             return true;
         else
@@ -537,12 +552,12 @@ public class FastLeaderElection implemen
     }
 
     /**
-     * In the case there is a leader elected, and a quorum supporting 
+     * In the case there is a leader elected, and a quorum supporting
      * this leader, we have to check if the leader has voted and acked
      * that it is leading. We need this check to avoid that peers keep
      * electing over and over a peer that has crashed and it is no
      * longer leading.
-     * 
+     *
      * @param votes set of votes
      * @param   leader  leader id
      * @param   epoch   epoch id
@@ -551,39 +566,38 @@ public class FastLeaderElection implemen
             HashMap<Long, Vote> votes,
             long leader,
             long epoch){
-        
+
         boolean predicate = true;
 
         /*
          * If everyone else thinks I'm the leader, I must be the leader.
          * The other two checks are just for the case in which I'm not the
-         * leader. If I'm not the leader and I haven't received a message 
-         * from leader stating that it is leading, then predicate is false.          
+         * leader. If I'm not the leader and I haven't received a message
+         * from leader stating that it is leading, then predicate is false.
          */
-        
+
         if(leader != self.getId()){
             if(votes.get(leader) == null) predicate = false;
             else if(votes.get(leader).state != ServerState.LEADING) predicate = false;
         }
-        
-        //LOG.info("Leader predicate: " + predicate);
+
         return predicate;
     }
-    
+
     synchronized void updateProposal(long leader, long zxid){
         proposedLeader = leader;
         proposedZxid = zxid;
     }
-    
+
     synchronized Vote getVote(){
         return new Vote(proposedLeader, proposedZxid);
     }
-    
+
     /**
-     * A learning state can be either FOLLOWING or OBSERVING. 
-     * This method simply decides which one depending on the 
+     * A learning state can be either FOLLOWING or OBSERVING.
+     * This method simply decides which one depending on the
      * role of the server.
-     * 
+     *
      * @return ServerState
      */
     private ServerState learningState(){
@@ -591,15 +605,15 @@ public class FastLeaderElection implemen
             LOG.debug("I'm a participant: " + self.getId());
             return ServerState.FOLLOWING;
         }
-        else{ 
+        else{
             LOG.debug("I'm an observer: " + self.getId());
             return ServerState.OBSERVING;
         }
     }
-    
+
     /**
      * Returns the initial vote value of server identifier.
-     * 
+     *
      * @return long
      */
     private long getInitId(){
@@ -607,10 +621,10 @@ public class FastLeaderElection implemen
             return self.getId();
         else return Long.MIN_VALUE;
     }
-    
+
     /**
      * Returns initial last logged zxid.
-     * 
+     *
      * @return long
      */
     private long getInitLastLoggedZxid(){
@@ -618,17 +632,17 @@ public class FastLeaderElection implemen
             return self.getLastLoggedZxid();
         else return Long.MIN_VALUE;
     }
-    
+
     /**
-     * Starts a new round of leader election. Whenever our QuorumPeer 
-     * changes its state to LOOKING, this method is invoked, and it 
+     * Starts a new round of leader election. Whenever our QuorumPeer
+     * changes its state to LOOKING, this method is invoked, and it
      * sends notifications to all other peers.
      */
     public Vote lookForLeader() throws InterruptedException {
         try {
             self.jmxLeaderElectionBean = new LeaderElectionBean();
             MBeanRegistry.getInstance().register(
-                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);        
+                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
         } catch (Exception e) {
             LOG.warn("Failed to register with JMX", e);
             self.jmxLeaderElectionBean = null;
@@ -636,24 +650,24 @@ public class FastLeaderElection implemen
 
         try {
             HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
-    
+
             HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
-    
+
             int notTimeout = finalizeWait;
-            
+
             synchronized(this){
                 logicalclock++;
                     updateProposal(getInitId(), getInitLastLoggedZxid());
             }
-            
+
             LOG.info("New election. My id =  " + self.getId() +
                     ", Proposed zxid = " + proposedZxid);
             sendNotifications();
-    
+
             /*
              * Loop in which we exchange notifications until we find a leader
              */
-    
+
             while ((self.getPeerState() == ServerState.LOOKING) &&
                     (!stop)){
                 /*
@@ -662,34 +676,30 @@ public class FastLeaderElection implemen
                  */
                 Notification n = recvqueue.poll(notTimeout,
                         TimeUnit.MILLISECONDS);
-                
+
                 /*
                  * Sends more notifications if haven't received enough.
                  * Otherwise processes new notification.
                  */
                 if(n == null){
-                	if(manager.haveDelivered()){
-                		sendNotifications();
-                	} else {
-                	    manager.connectAll();
-                	}
-                	
-                	/*
-                	 * Exponential backoff
-                	 */
-                	int tmpTimeOut = notTimeout*2;
-                	notTimeout = (tmpTimeOut < maxNotificationInterval?
-                	        tmpTimeOut : maxNotificationInterval);
-                	LOG.info("Notification time out: " + notTimeout);
+                    if(manager.haveDelivered()){
+                        sendNotifications();
+                    } else {
+                        manager.connectAll();
+                    }
+
+                    /*
+                     * Exponential backoff
+                     */
+                    int tmpTimeOut = notTimeout*2;
+                    notTimeout = (tmpTimeOut < maxNotificationInterval?
+                            tmpTimeOut : maxNotificationInterval);
+                    LOG.info("Notification time out: " + notTimeout);
                 }
-                else{                    
-                    switch (n.state) { 
+                else{
+                    switch (n.state) {
                     case LOOKING:
                         // If notification > current, replace and send messages out
-                        LOG.info("Notification: " + n.leader + ", " + n.zxid
-                                + ", " + n.epoch + ", " + self.getId() + ", "
-                                + self.getPeerState() + ", " + n.state + ", "
-                                + n.sid);
                         if (n.epoch > logicalclock) {
                             logicalclock = n.epoch;
                             recvset.clear();
@@ -702,7 +712,7 @@ public class FastLeaderElection implemen
                             sendNotifications();
                         } else if (n.epoch < logicalclock) {
                             if(LOG.isDebugEnabled()){
-                                LOG.debug("Notification epoch is smaller than logicalclock.
n.epoch = " + n.epoch 
+                                LOG.debug("Notification epoch is smaller than logicalclock.
n.epoch = " + n.epoch
                                         + ", Logical clock" + logicalclock);
                             }
                             break;
@@ -712,29 +722,29 @@ public class FastLeaderElection implemen
                             updateProposal(n.leader, n.zxid);
                             sendNotifications();
                         }
-                    
+
                         if(LOG.isDebugEnabled()){
                             LOG.debug("Adding vote: From = " + n.sid +
                                     ", Proposed leader = " + n.leader +
                                     ", Porposed zxid = " + n.zxid +
                                     ", Proposed epoch = " + n.epoch);
                         }
-                        
+
                         /*
-                         * Only proceed if the vote comes from a replica in the 
+                         * Only proceed if the vote comes from a replica in the
                          * voting view.
                          */
                         if(self.getVotingView().containsKey(n.sid)){
                             recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
-    
+
                             //If have received from all nodes, then terminate
                             if ((self.getVotingView().size() == recvset.size()) &&
                                     (self.getQuorumVerifier().getWeight(proposedLeader) !=
0)){
-                                self.setPeerState((proposedLeader == self.getId()) ? 
+                                self.setPeerState((proposedLeader == self.getId()) ?
                                         ServerState.LEADING: learningState());
                                 leaveInstance();
                                 return new Vote(proposedLeader, proposedZxid);
-                            
+
                             } else if (termPredicate(recvset,
                                     new Vote(proposedLeader, proposedZxid,
                                             logicalclock))) {
@@ -748,21 +758,21 @@ public class FastLeaderElection implemen
                                         break;
                                     }
                                 }
-                        
+
                                 /*
                                  * This predicate is true once we don't read any new
                                  * relevant message from the reception queue
                                  */
                                 if (n == null) {
-                                    self.setPeerState((proposedLeader == self.getId()) ?

+                                    self.setPeerState((proposedLeader == self.getId()) ?
                                             ServerState.LEADING: learningState());
                                     if(LOG.isDebugEnabled()){
                                         LOG.debug("About to leave FLE instance: Leader= "
-                                            + proposedLeader + ", Zxid = " + 
+                                            + proposedLeader + ", Zxid = " +
                                             proposedZxid + ", My id = " + self.getId()
                                             + ", My state = " + self.getPeerState());
                                     }
-                                    
+
                                     leaveInstance();
                                     return new Vote(proposedLeader,
                                             proposedZxid);
@@ -778,7 +788,7 @@ public class FastLeaderElection implemen
                          * There is at most one leader for each epoch, so if a
                          * peer claims to be the leader for an epoch, then that
                          * peer must be the leader (no* arbitrary failures
-                         * assumed). Now, if there is no quorum supporting 
+                         * assumed). Now, if there is no quorum supporting
                          * this leader, then processes will naturally move
                          * to a new epoch.
                          */
@@ -788,39 +798,34 @@ public class FastLeaderElection implemen
                                     (termPredicate(recvset, new Vote(n.leader,
                                             n.zxid, n.epoch, n.state))
                                             && checkLeader(outofelection, n.leader,
n.epoch)) ){
-                                self.setPeerState((n.leader == self.getId()) ? 
+                                self.setPeerState((n.leader == self.getId()) ?
                                         ServerState.LEADING: learningState());
-                       
+
                                 leaveInstance();
                                 return new Vote(n.leader, n.zxid);
-                            } 
+                            }
                         }
-                    
-                        LOG.info("Notification: " + n.leader + ", " + n.zxid + 
-                                ", " + n.epoch + ", " + self.getId() + ", " + 
-                                self.getPeerState() + ", " + n.state + ", "
-                                + n.sid);
-           
+
                         outofelection.put(n.sid, new Vote(n.leader, n.zxid,
                                 n.epoch, n.state));
-    
+
                         if (termPredicate(outofelection, new Vote(n.leader,
                                 n.zxid, n.epoch, n.state))
                                 && checkLeader(outofelection, n.leader, n.epoch))
{
                             synchronized(this){
                                 logicalclock = n.epoch;
-                                self.setPeerState((n.leader == self.getId()) ? 
+                                self.setPeerState((n.leader == self.getId()) ?
                                         ServerState.LEADING: learningState());
                             }
                             leaveInstance();
                             return new Vote(n.leader, n.zxid);
                         }
-                        
+
                         break;
                     }
                 }
             }
-    
+
             return null;
         } finally {
             try {



Mime
View raw message