hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r891034 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Tue, 15 Dec 2009 22:23:56 GMT
Author: breed
Date: Tue Dec 15 22:23:55 2009
New Revision: 891034

URL: http://svn.apache.org/viewvc?rev=891034&view=rev
Log:
ZOOKEEPER-599. Changes to FLE and QuorumCnxManager to support Observers
ZOOKEEPER-506. QuorumBase should use default leader election

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Dec 15 22:23:55 2009
@@ -192,6 +192,10 @@
 
   ZOOKEEPER-425. Add OSGi metadata to zookeeper.jar (david bosschaert via breed)
 
+  ZOOKEEPER-599. Changes to FLE and QuorumCnxManager to support Observers (fpj via breed)
+
+  ZOOKEEPER-506. QuorumBase should use default leader election (fpj via breed)
+
 NEW FEATURES:
   ZOOKEEPER-539. generate eclipse project via ant target. (phunt via mahadev)
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
Tue Dec 15 22:23:55 2009
@@ -29,6 +29,7 @@
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 
@@ -188,81 +189,110 @@
             		try{
             			response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
             			if(response == null) continue;
-            			
-            			// Receive new message
-            			LOG.debug("Receive new message.");
-            			if (response.buffer.capacity() < 28) {
-            				LOG.error("Got a short response: "
-            						+ response.buffer.capacity());
-            				continue;
-            			}
-            			response.buffer.clear();
-               
-            			// State of peer that sent this message
-            			QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
-            			switch (response.buffer.getInt()) {
-            			case 0:
-            				ackstate = QuorumPeer.ServerState.LOOKING;
-            				break;
-            			case 1:
-            				ackstate = QuorumPeer.ServerState.FOLLOWING;
-            				break;
-            			case 2:
-            				ackstate = QuorumPeer.ServerState.LEADING;
-            				break;
-            			}
-                    	
-            			// Instantiate Notification and set its attributes
-            			Notification n = new Notification();
-            			n.leader = response.buffer.getLong();
-            			n.zxid = response.buffer.getLong();
-            			n.epoch = response.buffer.getLong();
-            			n.state = ackstate;
-            			n.sid = response.sid;
 
             			/*
-            			 * If this server is looking, then send proposed leader
+            			 * If it is from an observer, respond right away.
+            			 * Note that the following predicate assumes that
+            			 * if a server is not a follower, then it must be
+            			 * an observer. If we ever have any other type of
+            			 * learner in the future, we'll have to change the
+            			 * way we check for observers.
             			 */
+            			if(!self.getVotingView().containsKey(response.sid)){
+            			    Vote current = self.getCurrentVote();
+            			    ToSend notmsg = new ToSend(ToSend.mType.notification, 
+            			            current.id, 
+                                    current.zxid,
+            	                    logicalclock,
+            	                    self.getPeerState(),
+            	                    response.sid);
+
+            	            sendqueue.offer(notmsg);
+            			} else {           			
+            			    // Receive new message
+            			    if (LOG.isDebugEnabled()) {
+            			        LOG.debug("Receive new notification message. My id = " 
+            			                + self.getId());
+            			    }
+            			    
+            			    if (response.buffer.capacity() < 28) {
+            			        LOG.error("Got a short response: "
+            			                + response.buffer.capacity());
+            			        continue;
+            			    }
+            			    response.buffer.clear();
+
+            			    // State of peer that sent this message
+            			    QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
+            			    switch (response.buffer.getInt()) {
+            			    case 0:
+            			        ackstate = QuorumPeer.ServerState.LOOKING;
+            			        break;
+            			    case 1:
+            			        ackstate = QuorumPeer.ServerState.FOLLOWING;
+            			        break;
+            			    case 2:
+            			        ackstate = QuorumPeer.ServerState.LEADING;
+            			        break;
+            			    case 3:
+                                ackstate = QuorumPeer.ServerState.OBSERVING;
+                                break;
+            			    }
+            			    
+            			    // Instantiate Notification and set its attributes
+            			    Notification n = new Notification();
+            			    n.leader = response.buffer.getLong();
+            			    n.zxid = response.buffer.getLong();
+            			    n.epoch = response.buffer.getLong();
+            			    n.state = ackstate;
+            			    n.sid = response.sid;
 
-            			if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
-            				recvqueue.offer(n);
-            				if(recvqueue.size() == 0) LOG.debug("Message: " + n.sid);
-            				/*
-            				 * Send a notification back if the peer that sent this
-            				 * message is also looking and its logical clock is 
-            				 * lagging behind.
-            				 */
-            				if((ackstate == QuorumPeer.ServerState.LOOKING)
-            						&& (n.epoch < logicalclock)){
-            				    Vote v = getVote();
-            					ToSend notmsg = new ToSend(ToSend.mType.notification, 
-                						v.id, 
-                						v.zxid,
-                						logicalclock,
-                						self.getPeerState(),
-                						response.sid);
-                				sendqueue.offer(notmsg);
-            				}
-            			} else {
             			    /*
-            			     * If this server is not looking, but the one that sent the ack
-            			     * is looking, then send back what it believes to be the leader.
+            			     * If this server is looking, then send proposed leader
             			     */
-            			    Vote current = self.getCurrentVote();
-            			    if(ackstate == QuorumPeer.ServerState.LOOKING){
 
-            			     	LOG.info("Sending new notification.");   
-            			        ToSend notmsg = new ToSend(
-            			                ToSend.mType.notification, 
-            			                current.id, 
-            			                current.zxid,
-            			                logicalclock,
-            			                self.getPeerState(),
-            			                response.sid);
-            			        sendqueue.offer(notmsg);
-            				}
+            			    if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
+            			        recvqueue.offer(n);
+
+            			        /*
+            			         * Send a notification back if the peer that sent this
+            			         * message is also looking and its logical clock is 
+            			         * lagging behind.
+            			         */
+            			        if((ackstate == QuorumPeer.ServerState.LOOKING)
+            			                && (n.epoch < logicalclock)){
+            			            Vote v = getVote();
+            			            ToSend notmsg = new ToSend(ToSend.mType.notification, 
+            			                    v.id, 
+            			                    v.zxid,
+            			                    logicalclock,
+            			                    self.getPeerState(),
+            			                    response.sid);
+            			            sendqueue.offer(notmsg);
+            			        }
+            			    } else {
+            			        /*
+            			         * If this server is not looking, but the one that sent the ack
+            			         * is looking, then send back what it believes to be the leader.
+            			         */
+            			        Vote current = self.getCurrentVote();
+            			        if(ackstate == QuorumPeer.ServerState.LOOKING){
+            			            if(LOG.isDebugEnabled()){
+            			                LOG.debug("Sending new notification. My id =  " +
+            			                        self.getId() + ", Recipient = " +
+            			                        response.sid);
+            			            }
+            			            ToSend notmsg = new ToSend(
+            			                    ToSend.mType.notification, 
+            			                    current.id, 
+            			                    current.zxid,
+            			                    logicalclock,
+            			                    self.getPeerState(),
+            			                    response.sid);
+            			            sendqueue.offer(notmsg);
+            			        }
+            			    }
             			}
-            			
             		} catch (InterruptedException e) {
             			System.out.println("Interrupted Exception while waiting for new message" +
             					e.toString());
@@ -550,9 +580,49 @@
     }
     
     /**
+     * A learning state can be either FOLLOWING or OBSERVING. 
+     * This method simply decides which one depending on the 
+     * role of the server.
+     * 
+     * @return ServerState
+     */
+    private ServerState learningState(){
+        if(self.getPeerType() == LearnerType.PARTICIPANT){
+            LOG.debug("I'm a participant: " + self.getId());
+            return ServerState.FOLLOWING;
+        }
+        else{ 
+            LOG.debug("I'm an observer: " + self.getId());
+            return ServerState.OBSERVING;
+        }
+    }
+    
+    /**
+     * Returns the initial vote value of server identifier.
+     * 
+     * @return long
+     */
+    private long getInitId(){
+        if(self.getPeerType() == LearnerType.PARTICIPANT)
+            return self.getId();
+        else return Long.MIN_VALUE;
+    }
+    
+    /**
+     * Returns initial last logged zxid.
+     * 
+     * @return long
+     */
+    private long getInitLastLoggedZxid(){
+        if(self.getPeerType() == LearnerType.PARTICIPANT)
+            return self.getLastLoggedZxid();
+        else return Long.MIN_VALUE;
+    }
+    
+    /**
      * Starts a new round of leader election. Whenever our QuorumPeer 
      * changes its state to LOOKING, this method is invoked, and it 
-     * sends notifications to al other peers.
+     * sends notifications to all other peers.
      */
     public Vote lookForLeader() throws InterruptedException {
         try {
@@ -573,10 +643,11 @@
             
             synchronized(this){
                 logicalclock++;
-                updateProposal(self.getId(), self.getLastLoggedZxid());
+                    updateProposal(getInitId(), getInitLastLoggedZxid());
             }
             
-            LOG.info("New election: " + proposedZxid);
+            LOG.info("New election. My id =  " + self.getId() +
+                    ", Proposed zxid = " + proposedZxid);
             sendNotifications();
     
             /*
@@ -623,14 +694,17 @@
                             logicalclock = n.epoch;
                             recvset.clear();
                             if(totalOrderPredicate(n.leader, n.zxid,
-                                    self.getId(), self.getLastLoggedZxid()))
+                                    getInitId(), getInitLastLoggedZxid()))
                                 updateProposal(n.leader, n.zxid);
                             else
-                                updateProposal(self.getId(),
-                                        self.getLastLoggedZxid());
+                                updateProposal(getInitId(),
+                                        getInitLastLoggedZxid());
                             sendNotifications();
                         } else if (n.epoch < logicalclock) {
-                            LOG.info("n.epoch < logicalclock");
+                            if(LOG.isDebugEnabled()){
+                                LOG.debug("Notification epoch is smaller than logicalclock.
n.epoch = " + n.epoch 
+                                        + ", Logical clock" + logicalclock);
+                            }
                             break;
                         } else if (totalOrderPredicate(n.leader, n.zxid,
                                 proposedLeader, proposedZxid)) {
@@ -639,47 +713,66 @@
                             sendNotifications();
                         }
                     
-                        LOG.info("Adding vote");
-
-                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+                        if(LOG.isDebugEnabled()){
+                            LOG.debug("Adding vote: From = " + n.sid +
+                                    ", Proposed leader = " + n.leader +
+                                    ", Porposed zxid = " + n.zxid +
+                                    ", Proposed epoch = " + n.epoch);
+                        }
+                        
+                        /*
+                         * Only proceed if the vote comes from a replica in the 
+                         * voting view.
+                         */
+                        if(self.getVotingView().containsKey(n.sid)){
+                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
     
-                        //If have received from all nodes, then terminate
-                        if ((self.quorumPeers.size() == recvset.size()) &&
-                                (self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
-                            self.setPeerState((proposedLeader == self.getId()) ? 
-                                    ServerState.LEADING: ServerState.FOLLOWING);
-                            leaveInstance();
-                            return new Vote(proposedLeader, proposedZxid);
+                            //If have received from all nodes, then terminate
+                            if ((self.getVotingView().size() == recvset.size()) &&
+                                    (self.getQuorumVerifier().getWeight(proposedLeader) !=
0)){
+                                self.setPeerState((proposedLeader == self.getId()) ? 
+                                        ServerState.LEADING: learningState());
+                                leaveInstance();
+                                return new Vote(proposedLeader, proposedZxid);
                             
-                        } else if (termPredicate(recvset,
-                                new Vote(proposedLeader, proposedZxid,
-                                        logicalclock))) {
-                            //Otherwise, wait for a fixed amount of time
-                            LOG.debug("Passed predicate");
-    
-                            // Verify if there is any change in the proposed leader
-                            while((n = recvqueue.poll(finalizeWait,
-                                    TimeUnit.MILLISECONDS)) != null){
-                                if(totalOrderPredicate(n.leader, n.zxid,
-                                        proposedLeader, proposedZxid)){
-                                    recvqueue.put(n);
-                                    break;
+                            } else if (termPredicate(recvset,
+                                    new Vote(proposedLeader, proposedZxid,
+                                            logicalclock))) {
+
+                                // Verify if there is any change in the proposed leader
+                                while((n = recvqueue.poll(finalizeWait,
+                                        TimeUnit.MILLISECONDS)) != null){
+                                    if(totalOrderPredicate(n.leader, n.zxid,
+                                            proposedLeader, proposedZxid)){
+                                        recvqueue.put(n);
+                                        break;
+                                    }
                                 }
-                            }
                         
-                            if (n == null) {
-                                self.setPeerState((proposedLeader == self.getId()) ? 
-                                    ServerState.LEADING: ServerState.FOLLOWING);
-                                LOG.info("About to leave instance:"
-                                        + proposedLeader + ", " + 
-                                        proposedZxid + ", " + self.getId()
-                                        + ", " + self.getPeerState());
-                                leaveInstance();
-                                return new Vote(proposedLeader,
-                                    proposedZxid);
+                                /*
+                                 * This predicate is true once we don't read any new
+                                 * relevant message from the reception queue
+                                 */
+                                if (n == null) {
+                                    self.setPeerState((proposedLeader == self.getId()) ?

+                                            ServerState.LEADING: learningState());
+                                    if(LOG.isDebugEnabled()){
+                                        LOG.debug("About to leave FLE instance: Leader= "
+                                            + proposedLeader + ", Zxid = " + 
+                                            proposedZxid + ", My id = " + self.getId()
+                                            + ", My state = " + self.getPeerState());
+                                    }
+                                    
+                                    leaveInstance();
+                                    return new Vote(proposedLeader,
+                                            proposedZxid);
+                                }
                             }
                         }
                         break;
+                    case OBSERVING:
+                        LOG.debug("Notification from observer: " + n.sid);
+                        break;
                     default:
                         /*
                          * There is at most one leader for each epoch, so if a
@@ -696,7 +789,7 @@
                                             n.zxid, n.epoch, n.state))
                                             && checkLeader(outofelection, n.leader,
n.epoch)) ){
                                 self.setPeerState((n.leader == self.getId()) ? 
-                                        ServerState.LEADING: ServerState.FOLLOWING);
+                                        ServerState.LEADING: learningState());
                        
                                 leaveInstance();
                                 return new Vote(n.leader, n.zxid);
@@ -717,7 +810,7 @@
                             synchronized(this){
                                 logicalclock = n.epoch;
                                 self.setPeerState((n.leader == self.getId()) ? 
-                                        ServerState.LEADING: ServerState.FOLLOWING);
+                                        ServerState.LEADING: learningState());
                             }
                             leaveInstance();
                             return new Vote(n.leader, n.zxid);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Tue Dec 15 22:23:55 2009
@@ -26,6 +26,7 @@
 import java.nio.channels.SocketChannel;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.Enumeration;
+import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -68,6 +69,12 @@
     static final int MAX_CONNECTION_ATTEMPTS = 2;
     
     /*
+     * Negative counter for observer server ids.
+     */
+    
+    private long observerCounter = -1;
+    
+    /*
      * Local IP address
      */
     final QuorumPeer self;
@@ -185,6 +192,8 @@
         return false;
     }
 
+    
+    
     /**
      * If this server receives a connection request, then it gives up on the new
      * connection if it wins. Notice that it checks whether it has a connection
@@ -196,7 +205,6 @@
         Long sid = null;
         
         try {
-            // Sending challenge and sid
             byte[] msgBytes = new byte[8];
             ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
                 
@@ -205,8 +213,17 @@
                 
             // Read server id
             sid = Long.valueOf(msgBuffer.getLong());
+            if(sid == QuorumPeer.OBSERVER_ID){
+                /*
+                 * Choose identifier at random. We need a value to identify
+                 * the connection.
+                 */
+                
+                sid = observerCounter--;
+                LOG.info("Setting arbitrary identifier to observer: " + sid);
+            }
         } catch (IOException e) {
-            LOG.info("Exception reading or writing challenge: "
+            LOG.warn("Exception reading or writing challenge: "
                     + e.toString());
             return false;
         }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Tue Dec 15 22:23:55 2009
@@ -117,7 +117,7 @@
         LOOKING, FOLLOWING, LEADING, OBSERVING;
     }
     
-    /**
+    /*
      * A peer can either be participating, which implies that it is willing to
      * both vote in instances of consensus and to elect or become a Leader, or
      * it may be observing in which case it isn't.
@@ -129,6 +129,17 @@
         PARTICIPANT, OBSERVER;
     }
     
+    /*
+     * To enable observers to have no identifier, we need a generic identifier
+     * at least for QuorumCnxManager. We use the following constant to as the
+     * value of such a generic identifier. 
+     */
+    
+    static final long OBSERVER_ID = Long.MAX_VALUE;
+    
+    /*
+     * Default value of peer is participant
+     */
     private LearnerType peerType = LearnerType.PARTICIPANT;
     
     public LearnerType getPeerType() {
@@ -352,9 +363,7 @@
     @Override
     public synchronized void start() {
         cnxnFactory.start();        
-        if (getPeerType() == LearnerType.PARTICIPANT) {
-            startLeaderElection();
-        }
+        startLeaderElection();
         super.start();
     }
 
@@ -513,10 +522,9 @@
     protected Election makeLEStrategy(){
         LOG.debug("Initializing leader election protocol...");
 
-        // LeaderElection is the only implementation that correctly
-        // transitions between LOOKING and OBSERVER
-        if(electionAlg==null)
+        if(electionAlg==null){
             return new LeaderElection(this);
+        }
         return electionAlg;
     }
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
Tue Dec 15 22:23:55 2009
@@ -199,10 +199,7 @@
                 System.setProperty("zookeeper." + key, value);
             }
         }
-        if (observers.size() > 0 && electionAlg != 0) {
-            throw new IllegalArgumentException("Observers must currently be used with simple
leader election" +
-            		" (set electionAlg=0)");
-        }
+       
         if (dataDir == null) {
             throw new IllegalArgumentException("dataDir is not set");
         }

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java Tue Dec
15 22:23:55 2009
@@ -27,6 +27,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -63,18 +64,27 @@
     @Test
     public void testObserver() throws Exception {
         ClientBase.setupTestEnv();
-        final int CLIENT_PORT_QP1 = 3181;
-        final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
-        final int CLIENT_PORT_OBS = CLIENT_PORT_QP2 + 3;
+        
+        final int PORT_QP1 = PortAssignment.unique();
+        final int PORT_QP2 = PortAssignment.unique();
+        final int PORT_OBS = PortAssignment.unique();
+        final int PORT_QP_LE1 = PortAssignment.unique();
+        final int PORT_QP_LE2 = PortAssignment.unique();
+        final int PORT_OBS_LE = PortAssignment.unique();
+
+        final int CLIENT_PORT_QP1 = PortAssignment.unique();
+        final int CLIENT_PORT_QP2 = PortAssignment.unique();
+        final int CLIENT_PORT_OBS = PortAssignment.unique();
 
+        
         String quorumCfgSection =
-            "electionAlg=0\n" + 
-            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
-            + ":" + (CLIENT_PORT_QP1 + 2)
-            + "\nserver.2=localhost:" + (CLIENT_PORT_QP2 + 1)
-            + ":" + (CLIENT_PORT_QP2 + 2)
+            "electionAlg=3\n" + 
+            "server.1=localhost:" + (PORT_QP1)
+            + ":" + (PORT_QP_LE1)
+            + "\nserver.2=localhost:" + (PORT_QP2)
+            + ":" + (PORT_QP_LE2)
             + "\nserver.3=localhost:" 
-            + (CLIENT_PORT_OBS+1)+ ":" + (CLIENT_PORT_OBS + 2) + ":observer";
+            + (PORT_OBS)+ ":" + (PORT_OBS_LE) + ":observer";
         String obsCfgSection =  quorumCfgSection + "\npeerType=observer";
         MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
         MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
@@ -175,11 +185,12 @@
     @Test
     public void testSingleObserver() throws IOException{
         ClientBase.setupTestEnv();
-        final int CLIENT_PORT_QP1 = 3181;        
-
+        final int CLIENT_PORT_QP1 = PortAssignment.unique();        
+        final int CLIENT_PORT_QP2 = PortAssignment.unique();
+        
         String quorumCfgSection =
-            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
-            + ":" + (CLIENT_PORT_QP1 + 2) + "\npeerType=observer";
+            "server.1=localhost:" + (CLIENT_PORT_QP1)
+            + ":" + (CLIENT_PORT_QP2) + "\npeerType=observer";
                     
         MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
         q1.start();
@@ -190,53 +201,4 @@
         q1.shutdown();
     }    
     
-    /**
-     * Check that an attempt to instantiate an ensemble with observers and
-     * electionAlg != 0 fails (this will be removed when the restriction is). 
-     * @throws Exception
-     */
-    @Test
-    public void testLeaderElectionFail() throws Exception {        
-        ClientBase.setupTestEnv();
-        final int CLIENT_PORT_QP1 = 3181;
-        final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
-        final int CLIENT_PORT_OBS = CLIENT_PORT_QP2 + 3;
-
-        String quorumCfgSection =
-            "electionAlg=1\n" + 
-            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
-            + ":" + (CLIENT_PORT_QP1 + 2)
-            + "\nserver.2=localhost:" + (CLIENT_PORT_QP2 + 1)
-            + ":" + (CLIENT_PORT_QP2 + 2)
-            + "\nserver.3=localhost:" 
-            + (CLIENT_PORT_OBS+1)+ ":" + (CLIENT_PORT_OBS + 2) + ":observer";
-        QuorumPeerConfig qpc = new QuorumPeerConfig();
-        
-        File tmpDir = ClientBase.createTmpDir();
-        File confFile = new File(tmpDir, "zoo.cfg");
-
-        FileWriter fwriter = new FileWriter(confFile);
-        fwriter.write("tickTime=2000\n");
-        fwriter.write("initLimit=10\n");
-        fwriter.write("syncLimit=5\n");
-
-        File dataDir = new File(tmpDir, "data");
-        if (!dataDir.mkdir()) {
-            throw new IOException("Unable to mkdir " + dataDir);
-        }
-        fwriter.write("dataDir=" + dataDir.toString() + "\n");
-
-        fwriter.write("clientPort=" + CLIENT_PORT_QP1 + "\n");
-        fwriter.write(quorumCfgSection + "\n");
-        fwriter.flush();
-        fwriter.close();
-        try {
-            qpc.parse(confFile.toString());
-        } catch (ConfigException e) {
-            LOG.info("Config exception caught as expected: " + e.getCause());
-            return;
-        }
-        
-        assertTrue("Didn't get the expected config exception", false);        
-    } 
 }

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Tue Dec
15 22:23:55 2009
@@ -30,6 +30,7 @@
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -47,6 +48,12 @@
     private int port3;
     private int port4;
     private int port5;
+    
+    private int portLE1;
+    private int portLE2;
+    private int portLE3;
+    private int portLE4;
+    private int portLE5;
 
     @Override
     protected void setUp() throws Exception {
@@ -66,11 +73,18 @@
         port3 = PortAssignment.unique();
         port4 = PortAssignment.unique();
         port5 = PortAssignment.unique();
-        hostPort = "127.0.0.1:" + port1
-            + ",127.0.0.1:" + port2
-            + ",127.0.0.1:" + port3
-            + ",127.0.0.1:" + port4
-            + ",127.0.0.1:" + port5;
+        
+        portLE1 = PortAssignment.unique();
+        portLE2 = PortAssignment.unique();
+        portLE3 = PortAssignment.unique();
+        portLE4 = PortAssignment.unique();
+        portLE5 = PortAssignment.unique();
+        
+        hostPort = "127.0.0.1:" + port1 + ":" + portLE1
+            + ",127.0.0.1:" + port2 + ":" + portLE2
+            + ",127.0.0.1:" + port3 + ":" + portLE3
+            + ",127.0.0.1:" + port4 + ":" + portLE4
+            + ",127.0.0.1:" + port5 + ":" + portLE5;
         LOG.info("Ports are: " + hostPort);
 
         s1dir = ClientBase.createTmpDir();
@@ -102,11 +116,26 @@
         int initLimit = 3;
         int syncLimit = 3;
         HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
-        peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1",
port1 + 1000)));
-        peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1",
port2 + 1000)));
-        peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1",
port3 + 1000)));
-        peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1",
port4 + 1000)));
-        peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1",
port5 + 1000)));
+        peers.put(Long.valueOf(1), new QuorumServer(1, 
+                new InetSocketAddress("127.0.0.1", port1 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE1 + 1000),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(2), new QuorumServer(2, 
+                new InetSocketAddress("127.0.0.1", port2 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE2 + 1000),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(3), new QuorumServer(3, 
+                new InetSocketAddress("127.0.0.1", port3 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE3 + 1000),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(4), new QuorumServer(4, 
+                new InetSocketAddress("127.0.0.1", port4 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE4 + 1000),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(5), new QuorumServer(5, 
+                new InetSocketAddress("127.0.0.1", port5 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE5 + 1000),
+                LearnerType.PARTICIPANT));
         
         if (withObservers) {
             peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER;        
@@ -114,19 +143,19 @@
         }
 
         LOG.info("creating QuorumPeer 1 port " + port1);
-        s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 0, 1, tickTime, initLimit, syncLimit);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit);
         assertEquals(port1, s1.getClientPort());
         LOG.info("creating QuorumPeer 2 port " + port2);
-        s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 0, 2, tickTime, initLimit, syncLimit);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit);
         assertEquals(port2, s2.getClientPort());
         LOG.info("creating QuorumPeer 3 port " + port3);
-        s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 0, 3, tickTime, initLimit, syncLimit);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit);
         assertEquals(port3, s3.getClientPort());
         LOG.info("creating QuorumPeer 4 port " + port4);
-        s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 0, 4, tickTime, initLimit, syncLimit);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit);
         assertEquals(port4, s4.getClientPort());
         LOG.info("creating QuorumPeer 5 port " + port5);
-        s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 0, 5, tickTime, initLimit, syncLimit);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit);
         assertEquals(port5, s5.getClientPort());
         
         if (withObservers) {
@@ -181,26 +210,42 @@
         int initLimit = 3;
         int syncLimit = 3;
         HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
-        peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1",
port1 + 1000)));
-        peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1",
port2 + 1000)));
-        peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1",
port3 + 1000)));
-        peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1",
port4 + 1000)));
-        peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1",
port5 + 1000)));
 
+        peers.put(Long.valueOf(1), new QuorumServer(1, 
+                new InetSocketAddress("127.0.0.1", port1 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE1 + 1000),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(2), new QuorumServer(2, 
+                new InetSocketAddress("127.0.0.1", port2 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE2 + 1000),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(3), new QuorumServer(3, 
+                new InetSocketAddress("127.0.0.1", port3 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE3 + 1000),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(4), new QuorumServer(4, 
+                new InetSocketAddress("127.0.0.1", port4 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE4 + 1000),
+                LearnerType.PARTICIPANT));
+        peers.put(Long.valueOf(5), new QuorumServer(5, 
+                new InetSocketAddress("127.0.0.1", port5 + 1000),
+                new InetSocketAddress("127.0.0.1", portLE5 + 1000),
+                LearnerType.PARTICIPANT));
+        
         LOG.info("creating QuorumPeer 1 port " + port1);
-        s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 0, 1, tickTime, initLimit, syncLimit);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit);
         assertEquals(port1, s1.getClientPort());
         LOG.info("creating QuorumPeer 2 port " + port2);
-        s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 0, 2, tickTime, initLimit, syncLimit);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit);
         assertEquals(port2, s2.getClientPort());
         LOG.info("creating QuorumPeer 3 port " + port3);
-        s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 0, 3, tickTime, initLimit, syncLimit);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit);
         assertEquals(port3, s3.getClientPort());
         LOG.info("creating QuorumPeer 4 port " + port4);
-        s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 0, 4, tickTime, initLimit, syncLimit);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit);
         assertEquals(port4, s4.getClientPort());
         LOG.info("creating QuorumPeer 5 port " + port5);
-        s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 0, 5, tickTime, initLimit, syncLimit);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit);
         assertEquals(port5, s5.getClientPort());
     }
 
@@ -242,6 +287,7 @@
     protected void shutdown(QuorumPeer qp) {
         try {
             qp.shutdown();
+            ((FastLeaderElection) qp.getElectionAlg()).shutdown();
             qp.join(30000);
             if (qp.isAlive()) {
                 fail("QP failed to shutdown in 30 seconds");



Mime
View raw message