hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r921201 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Wed, 10 Mar 2010 01:27:15 GMT
Author: breed
Date: Wed Mar 10 01:27:14 2010
New Revision: 921201

URL: http://svn.apache.org/viewvc?rev=921201&view=rev
Log:
ZOOKEEPER-684. Race in LENonTerminateTest

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=921201&r1=921200&r2=921201&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Mar 10 01:27:14 2010
@@ -7,6 +7,8 @@ BUGFIXES: 
 Backward compatible changes:
 
 BUGFIXES: 
+  ZOOKEEPER-59. Synchronized block in NIOServerCnxn (fpj via breed)
+
   ZOOKEEPER-524. DBSizeTest is not really testing anything (breed)
 
   ZOOKEEPER-469. make sure CPPUNIT_CFLAGS isn't overwritten
@@ -254,6 +256,8 @@ BUGFIXES: 
 
   ZOOKEEPER-691. Interface changed for NIOServer.Factory (breed via mahadev)
 
+  ZOOKEEPER-685.  Race in LENonTerminateTest (henry via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
   "socket reuse" and failure to close client (phunt via mahadev)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=921201&r1=921200&r2=921201&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
Wed Mar 10 01:27:14 2010
@@ -74,7 +74,7 @@ public class Follower extends Learner{
                 }
                 syncWithLeader(newLeaderZxid);                
                 QuorumPacket qp = new QuorumPacket();
-                while (self.running) {
+                while (self.isRunning()) {
                     readPacket(qp);
                     processPacket(qp);                   
                 }                              

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=921201&r1=921200&r2=921201&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
Wed Mar 10 01:27:14 2010
@@ -41,9 +41,9 @@ import org.apache.zookeeper.server.quoru
 
 public class LeaderElection implements Election  {
     private static final Logger LOG = Logger.getLogger(LeaderElection.class);
-    private static Random epochGen = new Random();
+    protected static Random epochGen = new Random();
 
-    QuorumPeer self;
+    protected QuorumPeer self;
 
     public LeaderElection(QuorumPeer self) {
         this.self = self;
@@ -59,7 +59,7 @@ public class LeaderElection implements E
         public int winningCount;
     }
 
-    private ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes, HashSet<Long>
heardFrom) {
+    protected ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes, HashSet<Long>
heardFrom) {
         ElectionResult result = new ElectionResult();
         // Initialize with null vote
         result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
@@ -145,7 +145,7 @@ public class LeaderElection implements E
             HashMap<InetSocketAddress, Vote> votes =
                 new HashMap<InetSocketAddress, Vote>(self.getVotingView().size());
             int xid = epochGen.nextInt();
-            while (self.running) {
+            while (self.isRunning()) {
                 votes.clear();
                 requestBuffer.clear();
                 requestBuffer.putInt(xid);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java?rev=921201&r1=921200&r2=921201&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
Wed Mar 10 01:27:14 2010
@@ -71,7 +71,7 @@ public class Observer extends Learner{  
                 
                 syncWithLeader(newLeaderZxid);
                 QuorumPacket qp = new QuorumPacket();
-                while (self.running) {
+                while (self.isRunning()) {
                     readPacket(qp);
                     processPacket(qp);                   
                 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=921201&r1=921200&r2=921201&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Wed Mar 10 01:27:14 2010
@@ -495,7 +495,7 @@ public class QuorumPeer extends Thread i
                 this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
     }
 
-    private Election createElectionAlgorithm(int electionAlgorithm){
+    protected Election createElectionAlgorithm(int electionAlgorithm){
         Election le=null;
                 
         //TODO: use a factory rather than a switch
@@ -891,4 +891,12 @@ public class QuorumPeer extends Thread i
     public void setZKDatabase(ZKDatabase database) {
         this.zkDb = database;
     }
+
+    public void setRunning(boolean running) {
+        this.running = running;
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java?rev=921201&r1=921200&r2=921201&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
(original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
Wed Mar 10 01:27:14 2010
@@ -23,23 +23,212 @@ import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.LeaderElection;
+import org.apache.zookeeper.server.quorum.LeaderElectionBean;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.LeaderElection.ElectionResult;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.junit.Test;
 
 /**
  * Tests that a particular run of LeaderElection terminates correctly.
  */
 public class LENonTerminateTest extends TestCase {
+    public class MockLeaderElection extends LeaderElection {
+        public MockLeaderElection(QuorumPeer self) {
+            super(self);            
+        }
+
+        /**
+         * Temporary for 3.3.0 - we want to ensure that a round of voting happens
+         * before any of the peers update their votes. The easiest way to do that
+         * is to add a latch that all wait on after counting their votes. 
+         * 
+         * In 3.4.0 we intend to make this class more testable, and therefore
+         * there should be much less duplicated code.
+         * 
+         * JMX bean method calls are removed to reduce noise.
+         */
+        public Vote lookForLeader() throws InterruptedException {            
+            self.setCurrentVote(new Vote(self.getId(),
+                    self.getLastLoggedZxid()));
+            // We are going to look for a leader by casting a vote for ourself
+            byte requestBytes[] = new byte[4];
+            ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+            byte responseBytes[] = new byte[28];
+            ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
+            /* The current vote for the leader. Initially me! */
+            DatagramSocket s = null;
+            try {
+                s = new DatagramSocket();
+                s.setSoTimeout(200);
+            } catch (SocketException e1) {
+                LOG.error("Socket exception when creating socket for leader election", e1);
+                System.exit(4);
+            }
+            DatagramPacket requestPacket = new DatagramPacket(requestBytes,
+                    requestBytes.length);
+            DatagramPacket responsePacket = new DatagramPacket(responseBytes,
+                    responseBytes.length);
+            HashMap<InetSocketAddress, Vote> votes =
+                new HashMap<InetSocketAddress, Vote>(self.getVotingView().size());
+            int xid = epochGen.nextInt();
+            while (self.isRunning()) {
+                votes.clear();
+                requestBuffer.clear();
+                requestBuffer.putInt(xid);
+                requestPacket.setLength(4);
+                HashSet<Long> heardFrom = new HashSet<Long>();
+                for (QuorumServer server : self.getVotingView().values()) {
+                    LOG.info("Server address: " + server.addr);
+                    try {
+                        requestPacket.setSocketAddress(server.addr);
+                    } catch (IllegalArgumentException e) {
+                        // Sun doesn't include the address that causes this
+                        // exception to be thrown, so we wrap the exception
+                        // in order to capture this critical detail.
+                        throw new IllegalArgumentException(
+                                "Unable to set socket address on packet, msg:"
+                                + e.getMessage() + " with addr:" + server.addr,
+                                e);
+                    }
+
+                    try {
+                        s.send(requestPacket);
+                        responsePacket.setLength(responseBytes.length);
+                        s.receive(responsePacket);
+                        if (responsePacket.getLength() != responseBytes.length) {
+                            LOG.error("Got a short response: "
+                                    + responsePacket.getLength());
+                            continue;
+                        }
+                        responseBuffer.clear();
+                        int recvedXid = responseBuffer.getInt();
+                        if (recvedXid != xid) {
+                            LOG.error("Got bad xid: expected " + xid
+                                    + " got " + recvedXid);
+                            continue;
+                        }
+                        long peerId = responseBuffer.getLong();
+                        heardFrom.add(peerId);
+                        //if(server.id != peerId){
+                        Vote vote = new Vote(responseBuffer.getLong(),
+                                responseBuffer.getLong());
+                        InetSocketAddress addr =
+                            (InetSocketAddress) responsePacket
+                            .getSocketAddress();
+                        votes.put(addr, vote);
+                        //}
+                    } catch (IOException e) {
+                        LOG.warn("Ignoring exception while looking for leader",
+                                e);
+                        // Errors are okay, since hosts may be
+                        // down
+                    }
+                }
+
+                ElectionResult result = countVotes(votes, heardFrom);
+                
+                /**
+                 * This is the only difference from LeaderElection - wait for
+                 * this latch on the first time through this method. This ensures
+                 * that the first round of voting happens before setCurrentVote
+                 * is called below.
+                 */
+                LOG.info("Waiting for first round of voting to complete");
+                latch.countDown();
+                assertTrue("Thread timed out waiting for latch",
+                        latch.await(10000, TimeUnit.MILLISECONDS));
+                
+                // ZOOKEEPER-569:
+                // If no votes are received for live peers, reset to voting 
+                // for ourselves as otherwise we may hang on to a vote 
+                // for a dead peer                 
+                if (votes.size() == 0) {                    
+                    self.setCurrentVote(new Vote(self.getId(),
+                            self.getLastLoggedZxid()));
+                } else {
+                    if (result.winner.id >= 0) {
+                        self.setCurrentVote(result.vote);
+                        // To do: this doesn't use a quorum verifier
+                        if (result.winningCount > (self.getVotingView().size() / 2)) {
+                            self.setCurrentVote(result.winner);
+                            s.close();
+                            Vote current = self.getCurrentVote();
+                            LOG.info("Found leader: my type is: " + self.getPeerType());
+                            /*
+                             * We want to make sure we implement the state machine
+                             * correctly. If we are a PARTICIPANT, once a leader
+                             * is elected we can move either to LEADING or 
+                             * FOLLOWING. However if we are an OBSERVER, it is an
+                             * error to be elected as a Leader.
+                             */
+                            if (self.getPeerType() == LearnerType.OBSERVER) {
+                                if (current.id == self.getId()) {
+                                    // This should never happen!
+                                    LOG.error("OBSERVER elected as leader!");
+                                    Thread.sleep(100);
+                                }
+                                else {
+                                    self.setPeerState(ServerState.OBSERVING);
+                                    Thread.sleep(100);
+                                    return current;
+                                }
+                            } else {
+                                self.setPeerState((current.id == self.getId())
+                                        ? ServerState.LEADING: ServerState.FOLLOWING);
+                                if (self.getPeerState() == ServerState.FOLLOWING) {
+                                    Thread.sleep(100);
+                                }                            
+                                return current;
+                            }
+                        }
+                    }
+                }
+                Thread.sleep(1000);
+            }
+            return null;
+        }         
+    }
+    
+    public class MockQuorumPeer extends QuorumPeer {
+        public MockQuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
+                File logDir, int clientPort, int electionAlg,
+                long myid, int tickTime, int initLimit, int syncLimit)
+        throws IOException
+        {
+            super(quorumPeers, snapDir, logDir, electionAlg,
+                    myid,tickTime, initLimit,syncLimit,
+                    new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)),
+                    new QuorumMaj(countParticipants(quorumPeers)));
+        }
+        
+        protected  Election createElectionAlgorithm(int electionAlgorithm){
+            LOG.info("Returning mocked leader election");
+            return new MockLeaderElection(this);
+        }
+    }
+    
+    
     protected static final Logger LOG = Logger.getLogger(FLELostMessageTest.class);
     
     int count;
@@ -62,11 +251,14 @@ public class LENonTerminateTest extends 
     public void tearDown() throws Exception {
         LOG.info("FINISHED " + getName());
     }
-
+    
+    static final CountDownLatch latch = new CountDownLatch(2);
+    static final CountDownLatch mockLatch = new CountDownLatch(1);
 
     class LEThread extends Thread {
         int i;
         QuorumPeer peer;
+        
 
         LEThread(QuorumPeer peer, int i) {
             this.i = i;
@@ -84,7 +276,7 @@ public class LENonTerminateTest extends 
 
                 if (v == null){
                     fail("Thread " + i + " got a null vote");
-                }
+                }                                
 
                 /*
                  * A real zookeeper would take care of setting the current vote. Here
@@ -125,15 +317,13 @@ public class LENonTerminateTest extends 
         /*
          * peer1 and peer2 are A and B in the above example. 
          */
-        QuorumPeer peer1 = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 0, 0, 2,
2, 2);
+        QuorumPeer peer1 = new MockQuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 0, 0,
2, 2, 2);
         peer1.startLeaderElection();
         LEThread thread1 = new LEThread(peer1, 0);
-        thread1.start();
         
-        QuorumPeer peer2 = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 0, 1, 2,
2, 2);
+        QuorumPeer peer2 = new MockQuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 0, 1,
2, 2, 2);
         peer2.startLeaderElection();        
         LEThread thread2 = new LEThread(peer2, 1);
-        thread2.start();
                             
         /*
          * Start mock server.
@@ -150,6 +340,10 @@ public class LENonTerminateTest extends 
         };        
         
         thread3.start();
+        assertTrue("mockServer did not start in 5s",
+                mockLatch.await(5000, TimeUnit.MILLISECONDS));
+        thread1.start();
+        thread2.start();
         /*
          * Occasionally seen false negatives with a 5s timeout.
          */
@@ -171,9 +365,14 @@ public class LENonTerminateTest extends 
         DatagramPacket packet = new DatagramPacket(b, b.length);
         QuorumServer server = peers.get(Long.valueOf(2));
         DatagramSocket udpSocket = new DatagramSocket(server.addr.getPort());
+        LOG.info("In MockServer");
+        mockLatch.countDown();
         Vote current = new Vote(2, 1);
         for (int i=0;i<2;++i) {
             udpSocket.receive(packet);
+            responseBuffer.rewind();
+            LOG.info("Received " + responseBuffer.getInt() + " " + responseBuffer.getLong()
+ " " + responseBuffer.getLong());
+            LOG.info("From " + packet.getSocketAddress());
             responseBuffer.clear();
             responseBuffer.getInt(); // Skip the xid
             responseBuffer.putLong(2);



Mime
View raw message