hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r739075 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Thu, 29 Jan 2009 23:11:03 GMT
Author: mahadev
Date: Thu Jan 29 23:11:02 2009
New Revision: 739075

URL: http://svn.apache.org/viewvc?rev=739075&view=rev
Log:
 ZOOKEEPER-275. Bug in FastLeaderElection. (flavio via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=739075&r1=739074&r2=739075&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Jan 29 23:11:02 2009
@@ -74,6 +74,8 @@
   ZOOKEEPER-267.  java client incorrectly generating syncdisconnected event when in disconnected
state. (pat via breed)
 
   ZOOKEEPER-263. document connection host:port as comma separated list in forrest docs (pat
via breed)
+  
+  ZOOKEEPER-275. Bug in FastLeaderElection. (flavio via mahadev)
  
 IMPROVEMENTS:
    

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=739075&r1=739074&r2=739075&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
Thu Jan 29 23:11:02 2009
@@ -186,6 +186,7 @@
             			response = manager.recvQueue.take();
             			
             			// Receive new message
+            			LOG.debug("Receive new message.");
             			if (response.buffer.capacity() < 28) {
             				LOG.error("Got a short response: "
             						+ response.buffer.capacity());
@@ -246,7 +247,7 @@
             			    Vote current = self.getCurrentVote();
             			    if(ackstate == QuorumPeer.ServerState.LOOKING){
 
-            			        
+            			     	LOG.info("Sending new notification.");   
             			        ToSend notmsg = new ToSend(
             			                ToSend.mType.notification, 
             			                current.id, 
@@ -395,11 +396,16 @@
     private void leaveInstance() {
         recvqueue.clear();
     }
-
+    
+    public QuorumCnxManager getCnxManager(){
+    	return manager;
+    }
+    
     public void shutdown(){
         manager.halt();
     }
 
+    
     /**
      * Send notifications to all peers upon a change in our vote
      */
@@ -425,9 +431,10 @@
      * @param id    Server identifier
      * @param zxid  Last zxid observed by the issuer of this vote
      */
-    private boolean totalOrderPredicate(long id, long zxid) {
-        if ((zxid > proposedZxid)
-                || ((zxid == proposedZxid) && (id > proposedLeader)))
+    private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid)
{
+        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: " + newZxid + ",
proposed zxid: " + curZxid);
+        if ((newZxid > curZxid)
+                || ((newZxid == curZxid) && (newId > curId)))
             return true;
         else
             return false;
@@ -557,15 +564,21 @@
                     if (n.epoch > logicalclock) {
                         logicalclock = n.epoch;
                         recvset.clear();
-                        updateProposal(self.getId(), self.getLastLoggedZxid());
+                        if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid()))
+                            updateProposal(n.leader, n.zxid);
+                        else
+                            updateProposal(self.getId(), self.getLastLoggedZxid());
                         sendNotifications();
                     } else if (n.epoch < logicalclock) {
+                        LOG.info("n.epoch < logicalclock");
                         break;
-                    } else if (totalOrderPredicate(n.leader, n.zxid)) {
+                    } else if (totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid))
{
+                        LOG.info("Updating proposal");
                         updateProposal(n.leader, n.zxid);
                         sendNotifications();
                     }
                 
+                    LOG.info("Adding vote");
                     recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
 
                     //If have received from all nodes, then terminate
@@ -581,7 +594,7 @@
 
                         // Verify if there is any change in the proposed leader
                         while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) !=
null){
-                            if(totalOrderPredicate(n.leader, n.zxid)){
+                            if(totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)){
                                 recvqueue.put(n);
                                 break;
                             }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=739075&r1=739074&r2=739075&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Thu Jan 29 23:11:02 2009
@@ -128,7 +128,7 @@
 
         // Generates a challenge to guarantee one connection between pairs of
         // servers
-        genChallenge();
+        //genChallenge();
 
         // Starts listener thread that waits for connection requests 
         listener = new Listener();
@@ -364,11 +364,12 @@
      */
     boolean haveDelivered() {
         for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
-            if (queue.size() != 0)
-                return false;
+            LOG.debug("Queue size: " + queue.size());
+            if (queue.size() == 0)
+                return true;
         }
 
-        return true;
+        return false;
     }
 
     /**
@@ -376,13 +377,17 @@
      */
     public void halt() {
         shutdown = true;
-        LOG.info("Halting listener");
+        LOG.debug("Halting listener");
         listener.halt();
         
-        for(SendWorker sw: senderWorkerMap.values()){
-            LOG.info("Halting sender: " + sw);
-            sw.finish();
-        }
+        softHalt();
+    }
+   
+    public void softHalt(){
+    	for(SendWorker sw: senderWorkerMap.values()){
+    		LOG.debug("Halting sender: " + sw);
+    		sw.finish();
+    	}   	
     }
 
     /**
@@ -401,6 +406,7 @@
                 ss = ServerSocketChannel.open();
                 int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
                 LOG.info("My election bind port: " + port);
+                ss.socket().setReuseAddress(true); 
                 ss.socket().bind(new InetSocketAddress(port));
 
                 while (!shutdown) {
@@ -410,6 +416,8 @@
                     
                     LOG.info("Connection request "
                             + sock.getRemoteSocketAddress());
+                    //synchronized(senderWorkerMap){
+                    LOG.info("Connection request: " + self.getId());
                     receiveConnection(client);
                 }
             } catch (IOException e) {
@@ -419,7 +427,7 @@
         
         void halt(){
             try{
-                if(ss != null) ss.close();
+                if((ss != null) && (ss.isOpen())) ss.close();
             } catch (IOException e){
                 LOG.warn("Exception when shutting down listener: " + e);
             }
@@ -453,6 +461,7 @@
         boolean finish() {
             running = false;
 
+            LOG.debug("Calling finish");
             this.interrupt();
             if (recvWorker != null)
                 recvWorker.finish();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=739075&r1=739074&r2=739075&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Thu Jan 29 23:11:02 2009
@@ -483,7 +483,9 @@
             follower.shutdown();
         }
         cnxnFactory.shutdown();
-        udpSocket.close();
+        if(udpSocket != null) {
+            udpSocket.close();
+        }
     }
 
     public String[] getQuorumPeers() {

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=739075&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Thu
Jan 29 23:11:02 2009
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumStats;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+
+import junit.framework.TestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLENewEpochTest extends TestCase {
+    protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class);
+
+    int count;
+    int baseport;
+    int baseLEport;
+    HashMap<Long,QuorumServer> peers;
+    ArrayList<LEThread> threads;
+    File tmpdir[];
+    int port[];
+    int[] round;
+    
+    @Override
+    public void setUp() throws Exception {
+        count = 3;
+        baseport= 33303;
+        baseLEport = 43303;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        threads = new ArrayList<LEThread>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+
+        round = new int[3];
+        round[0] = 0;
+        round[1] = 0;
+        round[2] = 0;
+        LOG.info("SetUp " + getName());
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        for(int i = 0; i < threads.size(); i++) {
+            ((FastLeaderElection) threads.get(i).peer.getElectionAlg()).shutdown();
+        }
+        LOG.info("FINISHED " + getName());
+    }
+
+
+    class LEThread extends Thread {
+        FastLeaderElection le;
+        int i;
+        QuorumPeer peer;
+
+        LEThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: " + getName());
+            
+        }
+
+        public void run(){
+        	boolean flag = true;
+            try{
+            	while(flag){
+            		Vote v = null;
+            		peer.setPeerState(ServerState.LOOKING);
+            		LOG.info("Going to call leader election again: " + i);
+            		v = peer.getElectionAlg().lookForLeader();
+
+            		if(v == null){
+            			fail("Thread " + i + " got a null vote");
+            		}
+
+            		/*
+            		 * A real zookeeper would take care of setting the current vote. Here
+            		 * we do it manually.
+            		 */
+            		peer.setCurrentVote(v);
+
+            		LOG.info("Finished election: " + i + ", " + v.id);
+            		//votes[i] = v;
+
+            		switch(i){
+            		case 0:
+            			LOG.info("First peer, do nothing, just join");
+            			flag = false;
+            			break;
+            		case 1:
+            			LOG.info("Second entering case");
+            			if(round[1] != 0) flag = false;
+            			else{
+            				while(round[2] == 0){
+            					Thread.sleep(200);
+            				}
+            			}
+            			LOG.info("Second is going to start second round");
+            			round[1]++;
+            			break;
+            		case 2:
+            			LOG.info("Third peer, shutting it down");
+            			((FastLeaderElection) peer.getElectionAlg()).shutdown();
+            			peer.shutdown();
+            			flag = false;
+            			round[2] = 1;
+            			LOG.info("Third leaving");
+            			break;
+            		}
+            	}
+            } catch (Exception e){
+            	e.printStackTrace();
+            }    
+        }
+    }
+
+
+      @Test
+      public void testLENewEpoch() throws Exception {
+
+          FastLeaderElection le[] = new FastLeaderElection[count];
+
+          LOG.info("TestLE: " + getName()+ ", " + count);
+          for(int i = 0; i < count; i++) {
+              peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(baseport+100+i),
+                      new InetSocketAddress(baseLEport+100+i)));
+              tmpdir[i] = File.createTempFile("letest", "test");
+              port[i] = baseport+i;
+          }
+
+          for(int i = 1; i < le.length; i++) {
+              QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i,
2, 2, 2);
+              peer.startLeaderElection();
+              LEThread thread = new LEThread(peer, i);
+              thread.start();
+              threads.add(thread);
+          }
+          Thread.sleep(2000);
+          QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2,
2, 2);
+          peer.startLeaderElection();
+          LEThread thread = new LEThread(peer, 0);
+          thread.start();
+          threads.add(thread);
+          
+          LOG.info("Started threads " + getName());
+
+          for(int i = 0; i < threads.size(); i++) {
+              threads.get(i).join(10000);
+              if (threads.get(i).isAlive()) {
+                  fail("Threads didn't join");
+              }
+
+          }
+      }
+  }



Mime
View raw message