Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@minotaur.apache.org Received: (qmail 66296 invoked from network); 29 Jan 2009 23:11:30 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 29 Jan 2009 23:11:30 -0000 Received: (qmail 1481 invoked by uid 500); 29 Jan 2009 23:11:30 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 1461 invoked by uid 500); 29 Jan 2009 23:11:29 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 1450 invoked by uid 99); 29 Jan 2009 23:11:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Jan 2009 15:11:29 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Jan 2009 23:11:23 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8E1922388ACB; Thu, 29 Jan 2009 23:11:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r739075 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/ Date: Thu, 29 Jan 2009 23:11:03 -0000 To: zookeeper-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090129231103.8E1922388ACB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mahadev Date: Thu Jan 29 23:11:02 2009 New Revision: 739075 URL: http://svn.apache.org/viewvc?rev=739075&view=rev Log: ZOOKEEPER-275. Bug in FastLeaderElection. (flavio via mahadev) Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=739075&r1=739074&r2=739075&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Thu Jan 29 23:11:02 2009 @@ -74,6 +74,8 @@ ZOOKEEPER-267. java client incorrectly generating syncdisconnected event when in disconnected state. (pat via breed) ZOOKEEPER-263. document connection host:port as comma separated list in forrest docs (pat via breed) + + ZOOKEEPER-275. Bug in FastLeaderElection. (flavio via mahadev) IMPROVEMENTS: Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=739075&r1=739074&r2=739075&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Thu Jan 29 23:11:02 2009 @@ -186,6 +186,7 @@ response = manager.recvQueue.take(); // Receive new message + LOG.debug("Receive new message."); if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); @@ -246,7 +247,7 @@ Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ - + LOG.info("Sending new notification."); ToSend notmsg = new ToSend( ToSend.mType.notification, current.id, @@ -395,11 +396,16 @@ private void leaveInstance() { recvqueue.clear(); } - + + public QuorumCnxManager getCnxManager(){ + return manager; + } + public void shutdown(){ manager.halt(); } + /** * Send notifications to all peers upon a change in our vote */ @@ -425,9 +431,10 @@ * @param id Server identifier * @param zxid Last zxid observed by the issuer of this vote */ - private boolean totalOrderPredicate(long id, long zxid) { - if ((zxid > proposedZxid) - || ((zxid == proposedZxid) && (id > proposedLeader))) + private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) { + LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: " + newZxid + ", proposed zxid: " + curZxid); + if ((newZxid > curZxid) + || ((newZxid == curZxid) && (newId > curId))) return true; else return false; @@ -557,15 +564,21 @@ if (n.epoch > logicalclock) { logicalclock = n.epoch; recvset.clear(); - updateProposal(self.getId(), self.getLastLoggedZxid()); + if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid())) + updateProposal(n.leader, n.zxid); + else + updateProposal(self.getId(), self.getLastLoggedZxid()); sendNotifications(); } else if (n.epoch < logicalclock) { + LOG.info("n.epoch < logicalclock"); break; - } else if (totalOrderPredicate(n.leader, n.zxid)) { + } else if (totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)) { + LOG.info("Updating proposal"); updateProposal(n.leader, n.zxid); sendNotifications(); } + LOG.info("Adding vote"); recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch)); //If have received from all nodes, then terminate @@ -581,7 +594,7 @@ // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ - if(totalOrderPredicate(n.leader, n.zxid)){ + if(totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)){ recvqueue.put(n); break; } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=739075&r1=739074&r2=739075&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Thu Jan 29 23:11:02 2009 @@ -128,7 +128,7 @@ // Generates a challenge to guarantee one connection between pairs of // servers - genChallenge(); + //genChallenge(); // Starts listener thread that waits for connection requests listener = new Listener(); @@ -364,11 +364,12 @@ */ boolean haveDelivered() { for (ArrayBlockingQueue queue : queueSendMap.values()) { - if (queue.size() != 0) - return false; + LOG.debug("Queue size: " + queue.size()); + if (queue.size() == 0) + return true; } - return true; + return false; } /** @@ -376,13 +377,17 @@ */ public void halt() { shutdown = true; - LOG.info("Halting listener"); + LOG.debug("Halting listener"); listener.halt(); - for(SendWorker sw: senderWorkerMap.values()){ - LOG.info("Halting sender: " + sw); - sw.finish(); - } + softHalt(); + } + + public void softHalt(){ + for(SendWorker sw: senderWorkerMap.values()){ + LOG.debug("Halting sender: " + sw); + sw.finish(); + } } /** @@ -401,6 +406,7 @@ ss = ServerSocketChannel.open(); int port = self.quorumPeers.get(self.getId()).electionAddr.getPort(); LOG.info("My election bind port: " + port); + ss.socket().setReuseAddress(true); ss.socket().bind(new InetSocketAddress(port)); while (!shutdown) { @@ -410,6 +416,8 @@ LOG.info("Connection request " + sock.getRemoteSocketAddress()); + //synchronized(senderWorkerMap){ + LOG.info("Connection request: " + self.getId()); receiveConnection(client); } } catch (IOException e) { @@ -419,7 +427,7 @@ void halt(){ try{ - if(ss != null) ss.close(); + if((ss != null) && (ss.isOpen())) ss.close(); } catch (IOException e){ LOG.warn("Exception when shutting down listener: " + e); } @@ -453,6 +461,7 @@ boolean finish() { running = false; + LOG.debug("Calling finish"); this.interrupt(); if (recvWorker != null) recvWorker.finish(); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=739075&r1=739074&r2=739075&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Jan 29 23:11:02 2009 @@ -483,7 +483,9 @@ follower.shutdown(); } cnxnFactory.shutdown(); - udpSocket.close(); + if(udpSocket != null) { + udpSocket.close(); + } } public String[] getQuorumPeers() { Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=739075&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java (added) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Thu Jan 29 23:11:02 2009 @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Random; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumCnxManager; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumStats; +import org.apache.zookeeper.server.quorum.Vote; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; + +import junit.framework.TestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class FLENewEpochTest extends TestCase { + protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class); + + int count; + int baseport; + int baseLEport; + HashMap peers; + ArrayList threads; + File tmpdir[]; + int port[]; + int[] round; + + @Override + public void setUp() throws Exception { + count = 3; + baseport= 33303; + baseLEport = 43303; + + peers = new HashMap(count); + threads = new ArrayList(count); + tmpdir = new File[count]; + port = new int[count]; + + round = new int[3]; + round[0] = 0; + round[1] = 0; + round[2] = 0; + LOG.info("SetUp " + getName()); + } + + @Override + public void tearDown() throws Exception { + for(int i = 0; i < threads.size(); i++) { + ((FastLeaderElection) threads.get(i).peer.getElectionAlg()).shutdown(); + } + LOG.info("FINISHED " + getName()); + } + + + class LEThread extends Thread { + FastLeaderElection le; + int i; + QuorumPeer peer; + + LEThread(QuorumPeer peer, int i) { + this.i = i; + this.peer = peer; + LOG.info("Constructor: " + getName()); + + } + + public void run(){ + boolean flag = true; + try{ + while(flag){ + Vote v = null; + peer.setPeerState(ServerState.LOOKING); + LOG.info("Going to call leader election again: " + i); + v = peer.getElectionAlg().lookForLeader(); + + if(v == null){ + fail("Thread " + i + " got a null vote"); + } + + /* + * A real zookeeper would take care of setting the current vote. Here + * we do it manually. + */ + peer.setCurrentVote(v); + + LOG.info("Finished election: " + i + ", " + v.id); + //votes[i] = v; + + switch(i){ + case 0: + LOG.info("First peer, do nothing, just join"); + flag = false; + break; + case 1: + LOG.info("Second entering case"); + if(round[1] != 0) flag = false; + else{ + while(round[2] == 0){ + Thread.sleep(200); + } + } + LOG.info("Second is going to start second round"); + round[1]++; + break; + case 2: + LOG.info("Third peer, shutting it down"); + ((FastLeaderElection) peer.getElectionAlg()).shutdown(); + peer.shutdown(); + flag = false; + round[2] = 1; + LOG.info("Third leaving"); + break; + } + } + } catch (Exception e){ + e.printStackTrace(); + } + } + } + + + @Test + public void testLENewEpoch() throws Exception { + + FastLeaderElection le[] = new FastLeaderElection[count]; + + LOG.info("TestLE: " + getName()+ ", " + count); + for(int i = 0; i < count; i++) { + peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(baseport+100+i), + new InetSocketAddress(baseLEport+100+i))); + tmpdir[i] = File.createTempFile("letest", "test"); + port[i] = baseport+i; + } + + for(int i = 1; i < le.length; i++) { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2); + peer.startLeaderElection(); + LEThread thread = new LEThread(peer, i); + thread.start(); + threads.add(thread); + } + Thread.sleep(2000); + QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2); + peer.startLeaderElection(); + LEThread thread = new LEThread(peer, 0); + thread.start(); + threads.add(thread); + + LOG.info("Started threads " + getName()); + + for(int i = 0; i < threads.size(); i++) { + threads.get(i).join(10000); + if (threads.get(i).isAlive()) { + fail("Threads didn't join"); + } + + } + } + }