From zookeeper-commits-return-307-apmail-hadoop-zookeeper-commits-archive=hadoop.apache.org@hadoop.apache.org Thu Apr 16 23:00:20 2009 Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@minotaur.apache.org Received: (qmail 61396 invoked from network); 16 Apr 2009 23:00:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 16 Apr 2009 23:00:20 -0000 Received: (qmail 95185 invoked by uid 500); 16 Apr 2009 23:00:20 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 95157 invoked by uid 500); 16 Apr 2009 23:00:20 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 95147 invoked by uid 99); 16 Apr 2009 23:00:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2009 23:00:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2009 23:00:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B756023889D0; Thu, 16 Apr 2009 22:59:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r765797 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/ Date: Thu, 16 Apr 2009 22:59:50 -0000 To: zookeeper-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090416225950.B756023889D0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mahadev Date: Thu Apr 16 22:59:49 2009 New Revision: 765797 URL: http://svn.apache.org/viewvc?rev=765797&view=rev Log: ZOOKEEPER-337. improve logging in leader election lookForLeader method when address resolution fails (phunt via mahadev) Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Thu Apr 16 22:59:49 2009 @@ -51,6 +51,9 @@ ZOOKEEPER-374. Uninitialized struct variable in C causes warning which is treated as an error (phunt via mahadev) + ZOOKEEPER-337. improve logging in leader election lookForLeader method when +address resolution fails (phunt via mahadev) + IMPROVEMENTS: ZOOKEEPER-308. improve the atomic broadcast performance 3x. (breed via mahadev) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Thu Apr 16 22:59:49 2009 @@ -32,6 +32,7 @@ import org.apache.log4j.Logger; +import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.quorum.Election; import org.apache.zookeeper.server.quorum.Vote; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; @@ -452,7 +453,17 @@ requestBuffer.put(zeroes); requestPacket.setLength(48); - requestPacket.setSocketAddress(m.addr); + try { + requestPacket.setSocketAddress(m.addr); + } catch (IllegalArgumentException e) { + // Sun doesn't include the address that causes this + // exception to be thrown, so we wrap the exception + // in order to capture this critical detail. + throw new IllegalArgumentException( + "Unable to set socket address on packet, msg:" + + e.getMessage() + " with addr:" + m.addr, + e); + } try { if (challengeMap.get(m.tag) == null) { @@ -486,7 +497,18 @@ requestBuffer.put(zeroes); requestPacket.setLength(48); - requestPacket.setSocketAddress(m.addr); + try { + requestPacket.setSocketAddress(m.addr); + } catch (IllegalArgumentException e) { + // Sun doesn't include the address that causes this + // exception to be thrown, so we wrap the exception + // in order to capture this critical detail. + throw new IllegalArgumentException( + "Unable to set socket address on packet, msg:" + + e.getMessage() + " with addr:" + m.addr, + e); + } + try { mySocket.send(requestPacket); @@ -512,7 +534,18 @@ requestBuffer.put(zeroes); requestPacket.setLength(48); - requestPacket.setSocketAddress(m.addr); + try { + requestPacket.setSocketAddress(m.addr); + } catch (IllegalArgumentException e) { + // Sun doesn't include the address that causes this + // exception to be thrown, so we wrap the exception + // in order to capture this critical detail. + throw new IllegalArgumentException( + "Unable to set socket address on packet, msg:" + + e.getMessage() + " with addr:" + m.addr, + e); + } + boolean myChallenge = false; boolean myAck = false; @@ -629,7 +662,18 @@ requestBuffer.putLong(m.epoch); requestPacket.setLength(48); - requestPacket.setSocketAddress(m.addr); + try { + requestPacket.setSocketAddress(m.addr); + } catch (IllegalArgumentException e) { + // Sun doesn't include the address that causes this + // exception to be thrown, so we wrap the exception + // in order to capture this critical detail. + throw new IllegalArgumentException( + "Unable to set socket address on packet, msg:" + + e.getMessage() + " with addr:" + m.addr, + e); + } + try { mySocket.send(requestPacket); @@ -764,129 +808,152 @@ } public Vote lookForLeader() throws InterruptedException { - HashMap recvset = - new HashMap(); - - HashMap outofelection = - new HashMap(); - - logicalclock++; - - proposedLeader = self.getId(); - proposedZxid = self.getLastLoggedZxid(); - - LOG.info("Election tally"); - sendNotifications(); - - /* - * Loop in which we exchange notifications until we find a leader - */ - - while (self.getPeerState() == ServerState.LOOKING) { - /* - * Remove next notification from queue, times out after 2 times the - * termination time - */ - Notification n = recvqueue.poll(2 * finalizeWait, - TimeUnit.MILLISECONDS); + try { + self.jmxLeaderElectionBean = new LeaderElectionBean(); + MBeanRegistry.getInstance().register( + self.jmxLeaderElectionBean, self.jmxLocalPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + self.jmxLeaderElectionBean = null; + } + try { + HashMap recvset = + new HashMap(); + + HashMap outofelection = + new HashMap(); + + logicalclock++; + + proposedLeader = self.getId(); + proposedZxid = self.getLastLoggedZxid(); + + LOG.info("Election tally"); + sendNotifications(); + /* - * Sends more notifications if haven't received enough. Otherwise - * processes new notification. + * Loop in which we exchange notifications until we find a leader */ - if (n == null) { - if (((!outofelection.isEmpty()) || (recvset.size() > 1))) - sendNotifications(); - } else - switch (n.state) { - case LOOKING: - if (n.epoch > logicalclock) { - logicalclock = n.epoch; - recvset.clear(); - if (totalOrderPredicate(n.leader, n.zxid)) { + + while (self.getPeerState() == ServerState.LOOKING) { + /* + * Remove next notification from queue, times out after 2 times + * the termination time + */ + Notification n = recvqueue.poll(2 * finalizeWait, + TimeUnit.MILLISECONDS); + + /* + * Sends more notifications if haven't received enough. + * Otherwise processes new notification. + */ + if (n == null) { + if (((!outofelection.isEmpty()) || (recvset.size() > 1))) + sendNotifications(); + } else + switch (n.state) { + case LOOKING: + if (n.epoch > logicalclock) { + logicalclock = n.epoch; + recvset.clear(); + if (totalOrderPredicate(n.leader, n.zxid)) { + proposedLeader = n.leader; + proposedZxid = n.zxid; + } + sendNotifications(); + } else if (n.epoch < logicalclock) { + break; + } else if (totalOrderPredicate(n.leader, n.zxid)) { proposedLeader = n.leader; proposedZxid = n.zxid; + + sendNotifications(); } - sendNotifications(); - } else if (n.epoch < logicalclock) { - break; - } else if (totalOrderPredicate(n.leader, n.zxid)) { - proposedLeader = n.leader; - proposedZxid = n.zxid; - - sendNotifications(); - } - - recvset.put(n.addr, new Vote(n.leader, n.zxid)); - - // If have received from all nodes, then terminate - if (self.quorumPeers.size() == recvset.size()) { - self.setPeerState((proposedLeader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); - // if (self.state == ServerState.FOLLOWING) { - // Thread.sleep(100); - // } - leaveInstance(); - return new Vote(proposedLeader, proposedZxid); - - } else if (termPredicate(recvset, proposedLeader, - proposedZxid)) { - // Otherwise, wait for a fixed amount of time - LOG.info("Passed predicate"); - Thread.sleep(finalizeWait); - - // Notification probe = recvqueue.peek(); - - // Verify if there is any change in the proposed leader - while ((!recvqueue.isEmpty()) - && !totalOrderPredicate( - recvqueue.peek().leader, recvqueue - .peek().zxid)) { - recvqueue.poll(); - } - if (recvqueue.isEmpty()) { - // LOG.warn("Proposed leader: " + - // proposedLeader); + + recvset.put(n.addr, new Vote(n.leader, n.zxid)); + + // If have received from all nodes, then terminate + if (self.quorumPeers.size() == recvset.size()) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: ServerState.FOLLOWING); // if (self.state == ServerState.FOLLOWING) { // Thread.sleep(100); // } - leaveInstance(); return new Vote(proposedLeader, proposedZxid); + + } else if (termPredicate(recvset, proposedLeader, + proposedZxid)) { + // Otherwise, wait for a fixed amount of time + LOG.info("Passed predicate"); + Thread.sleep(finalizeWait); + + // Notification probe = recvqueue.peek(); + + // Verify if there is any change in the proposed leader + while ((!recvqueue.isEmpty()) + && !totalOrderPredicate( + recvqueue.peek().leader, recvqueue + .peek().zxid)) { + recvqueue.poll(); + } + if (recvqueue.isEmpty()) { + // LOG.warn("Proposed leader: " + + // proposedLeader); + self.setPeerState( + (proposedLeader == self.getId()) ? + ServerState.LEADING : + ServerState.FOLLOWING); + // if (self.state == ServerState.FOLLOWING) { + // Thread.sleep(100); + // } + + leaveInstance(); + return new Vote(proposedLeader, proposedZxid); + } } + break; + case LEADING: + outofelection.put(n.addr, new Vote(n.leader, n.zxid)); + + if (termPredicate(outofelection, n.leader, n.zxid)) { + + self.setPeerState((n.leader == self.getId()) ? + ServerState.LEADING: ServerState.FOLLOWING); + + leaveInstance(); + return new Vote(n.leader, n.zxid); + } + break; + case FOLLOWING: + outofelection.put(n.addr, new Vote(n.leader, n.zxid)); + + if (termPredicate(outofelection, n.leader, n.zxid)) { + + self.setPeerState((n.leader == self.getId()) ? + ServerState.LEADING: ServerState.FOLLOWING); + + leaveInstance(); + return new Vote(n.leader, n.zxid); + } + break; + default: + break; } - break; - case LEADING: - outofelection.put(n.addr, new Vote(n.leader, n.zxid)); - - if (termPredicate(outofelection, n.leader, n.zxid)) { - - self.setPeerState((n.leader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); - - leaveInstance(); - return new Vote(n.leader, n.zxid); - } - break; - case FOLLOWING: - outofelection.put(n.addr, new Vote(n.leader, n.zxid)); - - if (termPredicate(outofelection, n.leader, n.zxid)) { - - self.setPeerState((n.leader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); - - leaveInstance(); - return new Vote(n.leader, n.zxid); - } - break; - default: - break; + } + + return null; + } finally { + try { + if(self.jmxLeaderElectionBean != null){ + MBeanRegistry.getInstance().unregister( + self.jmxLeaderElectionBean); } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + self.jmxLeaderElectionBean = null; } - - return null; } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Thu Apr 16 22:59:49 2009 @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; +import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; @@ -510,146 +511,183 @@ * sends notifications to al other peers. */ public Vote lookForLeader() throws InterruptedException { - HashMap recvset = new HashMap(); - - HashMap outofelection = new HashMap(); - - int notTimeout = finalizeWait; - - synchronized(this){ - logicalclock++; - updateProposal(self.getId(), self.getLastLoggedZxid()); + try { + self.jmxLeaderElectionBean = new LeaderElectionBean(); + MBeanRegistry.getInstance().register( + self.jmxLeaderElectionBean, self.jmxLocalPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + self.jmxLeaderElectionBean = null; } - - LOG.info("New election: " + proposedZxid); - sendNotifications(); - - /* - * Loop in which we exchange notifications until we find a leader - */ - while (self.getPeerState() == ServerState.LOOKING) { - /* - * Remove next notification from queue, times out after 2 times - * the termination time - */ - Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); + try { + HashMap recvset = new HashMap(); + + HashMap outofelection = new HashMap(); + + int notTimeout = finalizeWait; + + synchronized(this){ + logicalclock++; + updateProposal(self.getId(), self.getLastLoggedZxid()); + } + LOG.info("New election: " + proposedZxid); + sendNotifications(); + /* - * Sends more notifications if haven't received enough. - * Otherwise processes new notification. + * Loop in which we exchange notifications until we find a leader */ - if(n == null){ - if(manager.haveDelivered()){ - sendNotifications(); - } else { - manager.connectAll(); - } - - /* - * Exponential backoff - */ - int tmpTimeOut = notTimeout*2; - notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); - LOG.info("Notification time out: " + notTimeout); - } - else { - //notTimeout = finalizeWait; - switch (n.state) { - case LOOKING: - // If notification > current, replace and send messages out - LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + - n.epoch + ", " + self.getId() + ", " + self.getPeerState() + - ", " + n.state + ", " + n.sid); - if (n.epoch > logicalclock) { - LOG.debug("Increasing logical clock: " + n.epoch); - logicalclock = n.epoch; - recvset.clear(); - if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid())) - updateProposal(n.leader, n.zxid); - else - updateProposal(self.getId(), self.getLastLoggedZxid()); - sendNotifications(); - } else if (n.epoch < logicalclock) { - LOG.info("n.epoch < logicalclock"); - break; - } else if (totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)) { - LOG.info("Updating proposal"); - updateProposal(n.leader, n.zxid); - sendNotifications(); - } + + while (self.getPeerState() == ServerState.LOOKING) { + /* + * Remove next notification from queue, times out after 2 times + * the termination time + */ + Notification n = recvqueue.poll(notTimeout, + TimeUnit.MILLISECONDS); - LOG.info("Adding vote"); - recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch)); - - //If have received from all nodes, then terminate - if (self.quorumPeers.size() == recvset.size()) { - self.setPeerState((proposedLeader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); - leaveInstance(); - return new Vote(proposedLeader, proposedZxid); - - } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) { - //Otherwise, wait for a fixed amount of time - LOG.debug("Passed predicate"); - - // Verify if there is any change in the proposed leader - while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ - if(totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)){ - recvqueue.put(n); - break; - } + /* + * Sends more notifications if haven't received enough. + * Otherwise processes new notification. + */ + if(n == null){ + if(manager.haveDelivered()){ + sendNotifications(); + } else { + manager.connectAll(); + } + + /* + * Exponential backoff + */ + int tmpTimeOut = notTimeout*2; + notTimeout = (tmpTimeOut < maxNotificationInterval? + tmpTimeOut : maxNotificationInterval); + LOG.info("Notification time out: " + notTimeout); + } + else { + //notTimeout = finalizeWait; + switch (n.state) { + case LOOKING: + // If notification > current, replace and send messages out + LOG.info("Notification: " + n.leader + ", " + n.zxid + + ", " + n.epoch + ", " + self.getId() + ", " + + self.getPeerState() + ", " + n.state + ", " + + n.sid); + if (n.epoch > logicalclock) { + logicalclock = n.epoch; + recvset.clear(); + if(totalOrderPredicate(n.leader, n.zxid, + self.getId(), self.getLastLoggedZxid())) + updateProposal(n.leader, n.zxid); + else + updateProposal(self.getId(), + self.getLastLoggedZxid()); + sendNotifications(); + } else if (n.epoch < logicalclock) { + LOG.info("n.epoch < logicalclock"); + break; + } else if (totalOrderPredicate(n.leader, n.zxid, + proposedLeader, proposedZxid)) { + LOG.info("Updating proposal"); + updateProposal(n.leader, n.zxid); + sendNotifications(); } - if (n == null) { + LOG.info("Adding vote"); + recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch)); + + //If have received from all nodes, then terminate + if (self.quorumPeers.size() == recvset.size()) { self.setPeerState((proposedLeader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); - LOG.info("About to leave instance:" + proposedLeader + ", " + - proposedZxid + ", " + self.getId() + ", " + self.getPeerState()); + ServerState.LEADING: ServerState.FOLLOWING); leaveInstance(); - return new Vote(proposedLeader, - proposedZxid); + return new Vote(proposedLeader, proposedZxid); + + } else if (termPredicate(recvset, + new Vote(proposedLeader, proposedZxid, + logicalclock))) { + //Otherwise, wait for a fixed amount of time + LOG.debug("Passed predicate"); + + // Verify if there is any change in the proposed leader + while((n = recvqueue.poll(finalizeWait, + TimeUnit.MILLISECONDS)) != null){ + if(totalOrderPredicate(n.leader, n.zxid, + proposedLeader, proposedZxid)){ + recvqueue.put(n); + break; + } + } + + if (n == null) { + self.setPeerState((proposedLeader == self.getId()) ? + ServerState.LEADING: ServerState.FOLLOWING); + LOG.info("About to leave instance:" + + proposedLeader + ", " + + proposedZxid + ", " + self.getId() + + ", " + self.getPeerState()); + leaveInstance(); + return new Vote(proposedLeader, + proposedZxid); + } } - } - break; - case LEADING: - /* - * There is at most one leader for each epoch, so if a peer claims to - * be the leader for an epoch, then that peer must be the leader (no - * arbitrary failures assumed). Now, if there is no quorum supporting - * this leader, then processes will naturally move to a new epoch. - */ - if(n.epoch == logicalclock){ - self.setPeerState((n.leader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); - - leaveInstance(); - return new Vote(n.leader, n.zxid); - } - case FOLLOWING: - LOG.info("Notification: " + n.leader + ", " + n.zxid + - ", " + n.epoch + ", " + self.getId() + ", " + - self.getPeerState() + ", " + n.state + ", " + n.sid); - - outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state)); - - if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state)) - && checkLeader(outofelection, n.leader, n.epoch)) { - synchronized(this){ - logicalclock = n.epoch; + break; + case LEADING: + /* + * There is at most one leader for each epoch, so if a + * peer claims to be the leader for an epoch, then that + * peer must be the leader (no* arbitrary failures + * assumed). Now, if there is no quorum supporting + * this leader, then processes will naturally move + * to a new epoch. + */ + if(n.epoch == logicalclock){ self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: ServerState.FOLLOWING); + + leaveInstance(); + return new Vote(n.leader, n.zxid); + } + case FOLLOWING: + LOG.info("Notification: " + n.leader + ", " + n.zxid + + ", " + n.epoch + ", " + self.getId() + ", " + + self.getPeerState() + ", " + n.state + ", " + + n.sid); + + outofelection.put(n.sid, new Vote(n.leader, n.zxid, + n.epoch, n.state)); + + if (termPredicate(outofelection, new Vote(n.leader, + n.zxid, n.epoch, n.state)) + && checkLeader(outofelection, n.leader, n.epoch)) { + synchronized(this){ + logicalclock = n.epoch; + self.setPeerState((n.leader == self.getId()) ? + ServerState.LEADING: ServerState.FOLLOWING); + } + leaveInstance(); + return new Vote(n.leader, n.zxid); } - leaveInstance(); - return new Vote(n.leader, n.zxid); + break; + default: + break; } - break; - default: - break; } } + + return null; + } finally { + try { + if(self.jmxLeaderElectionBean != null){ + MBeanRegistry.getInstance().unregister( + self.jmxLeaderElectionBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + self.jmxLeaderElectionBean = null; } - - return null; } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java Thu Apr 16 22:59:49 2009 @@ -33,6 +33,7 @@ import org.apache.log4j.Logger; +import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.quorum.Vote; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; @@ -40,7 +41,7 @@ public class LeaderElection implements Election { private static final Logger LOG = Logger.getLogger(LeaderElection.class); private static Random epochGen = new Random(); - + QuorumPeer self; public LeaderElection(QuorumPeer self) { @@ -80,7 +81,7 @@ } } } - + HashMap countTable = new HashMap(); // Now do the tally for (Vote v : votesCast) { @@ -110,85 +111,120 @@ } public Vote lookForLeader() throws InterruptedException { - self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid())); - // We are going to look for a leader by casting a vote for ourself - byte requestBytes[] = new byte[4]; - ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); - byte responseBytes[] = new byte[28]; - ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes); - /* The current vote for the leader. Initially me! */ - DatagramSocket s = null; try { - s = new DatagramSocket(); - s.setSoTimeout(200); - } catch (SocketException e1) { - e1.printStackTrace(); - System.exit(4); + self.jmxLeaderElectionBean = new LeaderElectionBean(); + MBeanRegistry.getInstance().register( + self.jmxLeaderElectionBean, self.jmxLocalPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + self.jmxLeaderElectionBean = null; } - DatagramPacket requestPacket = new DatagramPacket(requestBytes, - requestBytes.length); - DatagramPacket responsePacket = new DatagramPacket(responseBytes, - responseBytes.length); - HashMap votes = new HashMap( - self.quorumPeers.size()); - int xid = epochGen.nextInt(); - while (self.running) { - votes.clear(); - requestBuffer.clear(); - requestBuffer.putInt(xid); - requestPacket.setLength(4); - HashSet heardFrom = new HashSet(); - for (QuorumServer server : self.quorumPeers.values()) { - requestPacket.setSocketAddress(server.addr); - LOG.info("Server address: " + server.addr); - try { - s.send(requestPacket); - responsePacket.setLength(responseBytes.length); - s.receive(responsePacket); - if (responsePacket.getLength() != responseBytes.length) { - LOG.error("Got a short response: " - + responsePacket.getLength()); - continue; + + try { + self.setCurrentVote(new Vote(self.getId(), + self.getLastLoggedZxid())); + // We are going to look for a leader by casting a vote for ourself + byte requestBytes[] = new byte[4]; + ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); + byte responseBytes[] = new byte[28]; + ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes); + /* The current vote for the leader. Initially me! */ + DatagramSocket s = null; + try { + s = new DatagramSocket(); + s.setSoTimeout(200); + } catch (SocketException e1) { + e1.printStackTrace(); + System.exit(4); + } + DatagramPacket requestPacket = new DatagramPacket(requestBytes, + requestBytes.length); + DatagramPacket responsePacket = new DatagramPacket(responseBytes, + responseBytes.length); + HashMap votes = + new HashMap(self.quorumPeers.size()); + int xid = epochGen.nextInt(); + while (self.running) { + votes.clear(); + requestBuffer.clear(); + requestBuffer.putInt(xid); + requestPacket.setLength(4); + HashSet heardFrom = new HashSet(); + for (QuorumServer server : self.quorumPeers.values()) { + LOG.info("Server address: " + server.addr); + try { + requestPacket.setSocketAddress(server.addr); + } catch (IllegalArgumentException e) { + // Sun doesn't include the address that causes this + // exception to be thrown, so we wrap the exception + // in order to capture this critical detail. + throw new IllegalArgumentException( + "Unable to set socket address on packet, msg:" + + e.getMessage() + " with addr:" + server.addr, + e); } - responseBuffer.clear(); - int recvedXid = responseBuffer.getInt(); - if (recvedXid != xid) { - LOG.error("Got bad xid: expected " + xid - + " got " + recvedXid); - continue; + + try { + s.send(requestPacket); + responsePacket.setLength(responseBytes.length); + s.receive(responsePacket); + if (responsePacket.getLength() != responseBytes.length) { + LOG.error("Got a short response: " + + responsePacket.getLength()); + continue; + } + responseBuffer.clear(); + int recvedXid = responseBuffer.getInt(); + if (recvedXid != xid) { + LOG.error("Got bad xid: expected " + xid + + " got " + recvedXid); + continue; + } + long peerId = responseBuffer.getLong(); + heardFrom.add(peerId); + //if(server.id != peerId){ + Vote vote = new Vote(responseBuffer.getLong(), + responseBuffer.getLong()); + InetSocketAddress addr = + (InetSocketAddress) responsePacket + .getSocketAddress(); + votes.put(addr, vote); + //} + } catch (IOException e) { + LOG.warn("Ignoring exception while looking for leader", + e); + // Errors are okay, since hosts may be + // down } - long peerId = responseBuffer.getLong(); - heardFrom.add(peerId); - //if(server.id != peerId){ - Vote vote = new Vote(responseBuffer.getLong(), - responseBuffer.getLong()); - InetSocketAddress addr = (InetSocketAddress) responsePacket - .getSocketAddress(); - votes.put(addr, vote); - //} - } catch (IOException e) { - LOG.warn("Ignoring exception while looking for leader", e); - // Errors are okay, since hosts may be - // down } - } - ElectionResult result = countVotes(votes, heardFrom); - if (result.winner.id >= 0) { - self.setCurrentVote(result.vote); - if (result.winningCount > (self.quorumPeers.size() / 2)) { - self.setCurrentVote(result.winner); - s.close(); - Vote current = self.getCurrentVote(); - self.setPeerState((current.id == self.getId()) - ? ServerState.LEADING: ServerState.FOLLOWING); - if (self.getPeerState() == ServerState.FOLLOWING) { - Thread.sleep(100); + ElectionResult result = countVotes(votes, heardFrom); + if (result.winner.id >= 0) { + self.setCurrentVote(result.vote); + if (result.winningCount > (self.quorumPeers.size() / 2)) { + self.setCurrentVote(result.winner); + s.close(); + Vote current = self.getCurrentVote(); + self.setPeerState((current.id == self.getId()) + ? ServerState.LEADING: ServerState.FOLLOWING); + if (self.getPeerState() == ServerState.FOLLOWING) { + Thread.sleep(100); + } + return current; } - return current; } + Thread.sleep(1000); + } + return null; + } finally { + try { + if(self.jmxLeaderElectionBean != null){ + MBeanRegistry.getInstance().unregister( + self.jmxLeaderElectionBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); } - Thread.sleep(1000); + self.jmxLeaderElectionBean = null; } - return null; } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Apr 16 22:59:49 2009 @@ -366,14 +366,7 @@ } protected Election makeLEStrategy(){ - LOG.debug("Running leader election protocol..."); - try { - jmxLeaderElectionBean = new LeaderElectionBean(); - MBeanRegistry.getInstance().register(jmxLeaderElectionBean, jmxLocalPeerBean); - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - jmxLeaderElectionBean = null; - } + LOG.debug("Initializing leader election protocol..."); if(electionAlg==null) return new LeaderElection(this); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java Thu Apr 16 22:59:49 2009 @@ -91,7 +91,7 @@ LOG.info("starting up the zookeeper server .. waiting"); assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); - ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this); + ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); for (int i =0; i < 2000; i++) { zk.create("/crctest- " + i , ("/crctest- " + i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java Thu Apr 16 22:59:49 2009 @@ -58,7 +58,7 @@ f.startup(zks); assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); - ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this); + ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); for (int i=0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Thu Apr 16 22:59:49 2009 @@ -20,13 +20,20 @@ import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.LineNumberReader; +import java.io.StringReader; +import java.util.regex.Pattern; import junit.framework.TestCase; +import org.apache.log4j.Layout; +import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.log4j.WriterAppender; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -35,6 +42,7 @@ import org.apache.zookeeper.test.ClientBase; import org.junit.Test; + /** * Test stand-alone server. * @@ -159,6 +167,67 @@ ClientBase.CONNECTION_TIMEOUT)); } + /** + * Verify handling of bad quorum address + */ + @Test + public void testBadPeerAddressInQuorum() throws Exception { + LOG.info("STARTING " + getName()); + ClientBase.setupTestEnv(); + + // setup the logger to capture all logs + Layout layout = + Logger.getRootLogger().getAppender("CONSOLE").getLayout(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + WriterAppender appender = new WriterAppender(layout, os); + appender.setThreshold(Level.WARN); + Logger.getLogger(org.apache.zookeeper.server.quorum.QuorumPeer.class) + .addAppender(appender); + + try { + final int CLIENT_PORT_QP1 = 3181; + final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3; + + String quorumCfgSection = + "server.1=localhost:" + (CLIENT_PORT_QP1 + 1) + + ":" + (CLIENT_PORT_QP1 + 2) + + "\nserver.2=fee.fii.foo.fum:" + (CLIENT_PORT_QP2 + 1) + + ":" + (CLIENT_PORT_QP2 + 2); + + MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); + q1.start(); + + boolean isup = + ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP1, + 5000); + + assertFalse(isup); + + q1.shutdown(); + + assertTrue("waiting for server 1 down", + ClientBase.waitForServerDown("localhost:" + CLIENT_PORT_QP1, + ClientBase.CONNECTION_TIMEOUT)); + + } finally { + Logger.getLogger(org.apache.zookeeper.server.quorum.QuorumPeer.class) + .removeAppender(appender); + } + + LineNumberReader r = new LineNumberReader(new StringReader(os.toString())); + String line; + boolean found = false; + Pattern p = + Pattern.compile(".*IllegalArgumentException.*fee.fii.foo.fum.*"); + while ((line = r.readLine()) != null) { + found = p.matcher(line).matches(); + if (found) { + break; + } + } + assertTrue("complains about host", found); + } + public void process(WatchedEvent event) { // ignore for this test } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java Thu Apr 16 22:59:49 2009 @@ -71,7 +71,7 @@ LOG.info("starting up the zookeeper server .. waiting"); assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); - ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this); + ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); String path; LOG.info("starting creating acls"); for (int i = 0; i < 100; i++) { Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java Thu Apr 16 22:59:49 2009 @@ -91,7 +91,7 @@ throws IOException, InterruptedException { CountdownWatcher watcher = new CountdownWatcher(); - ZooKeeper zk = new ZooKeeper(hp, 30000, watcher); + ZooKeeper zk = new ZooKeeper(hp, CONNECTION_TIMEOUT, watcher); if(!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { @@ -117,7 +117,7 @@ public void run() { try { - zk = new ZooKeeper(qb.hostPort, 30000, this); + zk = new ZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, this); while(bang) { incOutstanding(); // before create otw race zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE, Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Thu Apr 16 22:59:49 2009 @@ -129,7 +129,8 @@ protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp) throws IOException, InterruptedException { - TestableZooKeeper zk = new TestableZooKeeper(hp, 9000, watcher); + TestableZooKeeper zk = + new TestableZooKeeper(hp, CONNECTION_TIMEOUT, watcher); if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java Thu Apr 16 22:59:49 2009 @@ -82,7 +82,7 @@ IntegrityCheck(String hostPort, String path, int count) throws IOException { - zk = new ZooKeeper(hostPort, 15000, this); + zk = new ZooKeeper(hostPort, 30000, this); this.path = path; this.count = count; } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java Thu Apr 16 22:59:49 2009 @@ -104,7 +104,8 @@ } private void utestExists() throws IOException, InterruptedException, KeeperException { - ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this); + ZooKeeper zk = + new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this); for (int i = 0; i < 10000; i++) { zk.exists("/this/path/doesnt_exist!", true); } @@ -113,7 +114,8 @@ private void utestPrep() throws IOException, InterruptedException, KeeperException { - ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this); + ZooKeeper zk = + new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this); for (int i = 0; i < 10000; i++) { zk.create("/" + i, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @@ -121,7 +123,8 @@ } private void utestGet() throws IOException, InterruptedException, KeeperException { - ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this); + ZooKeeper zk = + new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this); for (int i = 0; i < 10000; i++) { Stat stat = new Stat(); zk.getData("/" + i, true, stat); @@ -130,7 +133,8 @@ } private void utestChildren() throws IOException, InterruptedException, KeeperException { - ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this); + ZooKeeper zk = + new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this); for (int i = 0; i < 10000; i++) { zk.getChildren("/" + i, true); } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java Thu Apr 16 22:59:49 2009 @@ -58,7 +58,7 @@ f.startup(zks); assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); - ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this); + ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); for (int i=0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java Thu Apr 16 22:59:49 2009 @@ -87,7 +87,7 @@ CONNECTION_TIMEOUT)); startSignal = new CountDownLatch(1); - ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this); + ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); startSignal.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); assertTrue("count == 0", startSignal.getCount() == 0); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java Thu Apr 16 22:59:49 2009 @@ -73,7 +73,7 @@ LOG.info("starting up the zookeeper server .. waiting"); assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); - ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this); + ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); Stat stat = zk.exists("/", false); List children = zk.getChildren("/", false); Collections.sort(children); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java?rev=765797&r1=765796&r2=765797&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java Thu Apr 16 22:59:49 2009 @@ -108,7 +108,7 @@ protected ZooKeeper createClient(Watcher watcher, CountDownLatch latch) throws IOException, InterruptedException { - ZooKeeper zk = new ZooKeeper(hostPort, 20000, watcher); + ZooKeeper zk = new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher); if(!latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){ fail("Unable to connect to server"); }