Author: mahadev
Date: Thu Apr 16 22:59:49 2009
New Revision: 765797
URL: http://svn.apache.org/viewvc?rev=765797&view=rev
Log:
ZOOKEEPER-337. improve logging in leader election lookForLeader method when address resolution fails (phunt via mahadev)
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Apr 16 22:59:49 2009
@@ -51,6 +51,9 @@
ZOOKEEPER-374. Uninitialized struct variable in C causes warning which
is treated as an error (phunt via mahadev)
+ ZOOKEEPER-337. improve logging in leader election lookForLeader method when
+address resolution fails (phunt via mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-308. improve the atomic broadcast performance 3x.
(breed via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Thu Apr 16 22:59:49 2009
@@ -32,6 +32,7 @@
import org.apache.log4j.Logger;
+import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.quorum.Election;
import org.apache.zookeeper.server.quorum.Vote;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -452,7 +453,17 @@
requestBuffer.put(zeroes);
requestPacket.setLength(48);
- requestPacket.setSocketAddress(m.addr);
+ try {
+ requestPacket.setSocketAddress(m.addr);
+ } catch (IllegalArgumentException e) {
+ // Sun doesn't include the address that causes this
+ // exception to be thrown, so we wrap the exception
+ // in order to capture this critical detail.
+ throw new IllegalArgumentException(
+ "Unable to set socket address on packet, msg:"
+ + e.getMessage() + " with addr:" + m.addr,
+ e);
+ }
try {
if (challengeMap.get(m.tag) == null) {
@@ -486,7 +497,18 @@
requestBuffer.put(zeroes);
requestPacket.setLength(48);
- requestPacket.setSocketAddress(m.addr);
+ try {
+ requestPacket.setSocketAddress(m.addr);
+ } catch (IllegalArgumentException e) {
+ // Sun doesn't include the address that causes this
+ // exception to be thrown, so we wrap the exception
+ // in order to capture this critical detail.
+ throw new IllegalArgumentException(
+ "Unable to set socket address on packet, msg:"
+ + e.getMessage() + " with addr:" + m.addr,
+ e);
+ }
+
try {
mySocket.send(requestPacket);
@@ -512,7 +534,18 @@
requestBuffer.put(zeroes);
requestPacket.setLength(48);
- requestPacket.setSocketAddress(m.addr);
+ try {
+ requestPacket.setSocketAddress(m.addr);
+ } catch (IllegalArgumentException e) {
+ // Sun doesn't include the address that causes this
+ // exception to be thrown, so we wrap the exception
+ // in order to capture this critical detail.
+ throw new IllegalArgumentException(
+ "Unable to set socket address on packet, msg:"
+ + e.getMessage() + " with addr:" + m.addr,
+ e);
+ }
+
boolean myChallenge = false;
boolean myAck = false;
@@ -629,7 +662,18 @@
requestBuffer.putLong(m.epoch);
requestPacket.setLength(48);
- requestPacket.setSocketAddress(m.addr);
+ try {
+ requestPacket.setSocketAddress(m.addr);
+ } catch (IllegalArgumentException e) {
+ // Sun doesn't include the address that causes this
+ // exception to be thrown, so we wrap the exception
+ // in order to capture this critical detail.
+ throw new IllegalArgumentException(
+ "Unable to set socket address on packet, msg:"
+ + e.getMessage() + " with addr:" + m.addr,
+ e);
+ }
+
try {
mySocket.send(requestPacket);
@@ -764,129 +808,152 @@
}
public Vote lookForLeader() throws InterruptedException {
- HashMap<InetSocketAddress, Vote> recvset =
- new HashMap<InetSocketAddress, Vote>();
-
- HashMap<InetSocketAddress, Vote> outofelection =
- new HashMap<InetSocketAddress, Vote>();
-
- logicalclock++;
-
- proposedLeader = self.getId();
- proposedZxid = self.getLastLoggedZxid();
-
- LOG.info("Election tally");
- sendNotifications();
-
- /*
- * Loop in which we exchange notifications until we find a leader
- */
-
- while (self.getPeerState() == ServerState.LOOKING) {
- /*
- * Remove next notification from queue, times out after 2 times the
- * termination time
- */
- Notification n = recvqueue.poll(2 * finalizeWait,
- TimeUnit.MILLISECONDS);
+ try {
+ self.jmxLeaderElectionBean = new LeaderElectionBean();
+ MBeanRegistry.getInstance().register(
+ self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ self.jmxLeaderElectionBean = null;
+ }
+ try {
+ HashMap<InetSocketAddress, Vote> recvset =
+ new HashMap<InetSocketAddress, Vote>();
+
+ HashMap<InetSocketAddress, Vote> outofelection =
+ new HashMap<InetSocketAddress, Vote>();
+
+ logicalclock++;
+
+ proposedLeader = self.getId();
+ proposedZxid = self.getLastLoggedZxid();
+
+ LOG.info("Election tally");
+ sendNotifications();
+
/*
- * Sends more notifications if haven't received enough. Otherwise
- * processes new notification.
+ * Loop in which we exchange notifications until we find a leader
*/
- if (n == null) {
- if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
- sendNotifications();
- } else
- switch (n.state) {
- case LOOKING:
- if (n.epoch > logicalclock) {
- logicalclock = n.epoch;
- recvset.clear();
- if (totalOrderPredicate(n.leader, n.zxid)) {
+
+ while (self.getPeerState() == ServerState.LOOKING) {
+ /*
+ * Remove next notification from queue, times out after 2 times
+ * the termination time
+ */
+ Notification n = recvqueue.poll(2 * finalizeWait,
+ TimeUnit.MILLISECONDS);
+
+ /*
+ * Sends more notifications if haven't received enough.
+ * Otherwise processes new notification.
+ */
+ if (n == null) {
+ if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
+ sendNotifications();
+ } else
+ switch (n.state) {
+ case LOOKING:
+ if (n.epoch > logicalclock) {
+ logicalclock = n.epoch;
+ recvset.clear();
+ if (totalOrderPredicate(n.leader, n.zxid)) {
+ proposedLeader = n.leader;
+ proposedZxid = n.zxid;
+ }
+ sendNotifications();
+ } else if (n.epoch < logicalclock) {
+ break;
+ } else if (totalOrderPredicate(n.leader, n.zxid)) {
proposedLeader = n.leader;
proposedZxid = n.zxid;
+
+ sendNotifications();
}
- sendNotifications();
- } else if (n.epoch < logicalclock) {
- break;
- } else if (totalOrderPredicate(n.leader, n.zxid)) {
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
-
- sendNotifications();
- }
-
- recvset.put(n.addr, new Vote(n.leader, n.zxid));
-
- // If have received from all nodes, then terminate
- if (self.quorumPeers.size() == recvset.size()) {
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- // if (self.state == ServerState.FOLLOWING) {
- // Thread.sleep(100);
- // }
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
-
- } else if (termPredicate(recvset, proposedLeader,
- proposedZxid)) {
- // Otherwise, wait for a fixed amount of time
- LOG.info("Passed predicate");
- Thread.sleep(finalizeWait);
-
- // Notification probe = recvqueue.peek();
-
- // Verify if there is any change in the proposed leader
- while ((!recvqueue.isEmpty())
- && !totalOrderPredicate(
- recvqueue.peek().leader, recvqueue
- .peek().zxid)) {
- recvqueue.poll();
- }
- if (recvqueue.isEmpty()) {
- // LOG.warn("Proposed leader: " +
- // proposedLeader);
+
+ recvset.put(n.addr, new Vote(n.leader, n.zxid));
+
+ // If have received from all nodes, then terminate
+ if (self.quorumPeers.size() == recvset.size()) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: ServerState.FOLLOWING);
// if (self.state == ServerState.FOLLOWING) {
// Thread.sleep(100);
// }
-
leaveInstance();
return new Vote(proposedLeader, proposedZxid);
+
+ } else if (termPredicate(recvset, proposedLeader,
+ proposedZxid)) {
+ // Otherwise, wait for a fixed amount of time
+ LOG.info("Passed predicate");
+ Thread.sleep(finalizeWait);
+
+ // Notification probe = recvqueue.peek();
+
+ // Verify if there is any change in the proposed leader
+ while ((!recvqueue.isEmpty())
+ && !totalOrderPredicate(
+ recvqueue.peek().leader, recvqueue
+ .peek().zxid)) {
+ recvqueue.poll();
+ }
+ if (recvqueue.isEmpty()) {
+ // LOG.warn("Proposed leader: " +
+ // proposedLeader);
+ self.setPeerState(
+ (proposedLeader == self.getId()) ?
+ ServerState.LEADING :
+ ServerState.FOLLOWING);
+ // if (self.state == ServerState.FOLLOWING) {
+ // Thread.sleep(100);
+ // }
+
+ leaveInstance();
+ return new Vote(proposedLeader, proposedZxid);
+ }
}
+ break;
+ case LEADING:
+ outofelection.put(n.addr, new Vote(n.leader, n.zxid));
+
+ if (termPredicate(outofelection, n.leader, n.zxid)) {
+
+ self.setPeerState((n.leader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
+ }
+ break;
+ case FOLLOWING:
+ outofelection.put(n.addr, new Vote(n.leader, n.zxid));
+
+ if (termPredicate(outofelection, n.leader, n.zxid)) {
+
+ self.setPeerState((n.leader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
+ }
+ break;
+ default:
+ break;
}
- break;
- case LEADING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
- case FOLLOWING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
- default:
- break;
+ }
+
+ return null;
+ } finally {
+ try {
+ if(self.jmxLeaderElectionBean != null){
+ MBeanRegistry.getInstance().unregister(
+ self.jmxLeaderElectionBean);
}
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ self.jmxLeaderElectionBean = null;
}
-
- return null;
}
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Thu Apr 16 22:59:49 2009
@@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
@@ -510,146 +511,183 @@
* sends notifications to al other peers.
*/
public Vote lookForLeader() throws InterruptedException {
- HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
-
- HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
-
- int notTimeout = finalizeWait;
-
- synchronized(this){
- logicalclock++;
- updateProposal(self.getId(), self.getLastLoggedZxid());
+ try {
+ self.jmxLeaderElectionBean = new LeaderElectionBean();
+ MBeanRegistry.getInstance().register(
+ self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ self.jmxLeaderElectionBean = null;
}
-
- LOG.info("New election: " + proposedZxid);
- sendNotifications();
-
- /*
- * Loop in which we exchange notifications until we find a leader
- */
- while (self.getPeerState() == ServerState.LOOKING) {
- /*
- * Remove next notification from queue, times out after 2 times
- * the termination time
- */
- Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
+ try {
+ HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
+
+ HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
+
+ int notTimeout = finalizeWait;
+
+ synchronized(this){
+ logicalclock++;
+ updateProposal(self.getId(), self.getLastLoggedZxid());
+ }
+ LOG.info("New election: " + proposedZxid);
+ sendNotifications();
+
/*
- * Sends more notifications if haven't received enough.
- * Otherwise processes new notification.
+ * Loop in which we exchange notifications until we find a leader
*/
- if(n == null){
- if(manager.haveDelivered()){
- sendNotifications();
- } else {
- manager.connectAll();
- }
-
- /*
- * Exponential backoff
- */
- int tmpTimeOut = notTimeout*2;
- notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
- LOG.info("Notification time out: " + notTimeout);
- }
- else {
- //notTimeout = finalizeWait;
- switch (n.state) {
- case LOOKING:
- // If notification > current, replace and send messages out
- LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " +
- n.epoch + ", " + self.getId() + ", " + self.getPeerState() +
- ", " + n.state + ", " + n.sid);
- if (n.epoch > logicalclock) {
- LOG.debug("Increasing logical clock: " + n.epoch);
- logicalclock = n.epoch;
- recvset.clear();
- if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid()))
- updateProposal(n.leader, n.zxid);
- else
- updateProposal(self.getId(), self.getLastLoggedZxid());
- sendNotifications();
- } else if (n.epoch < logicalclock) {
- LOG.info("n.epoch < logicalclock");
- break;
- } else if (totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)) {
- LOG.info("Updating proposal");
- updateProposal(n.leader, n.zxid);
- sendNotifications();
- }
+
+ while (self.getPeerState() == ServerState.LOOKING) {
+ /*
+ * Remove next notification from queue, times out after 2 times
+ * the termination time
+ */
+ Notification n = recvqueue.poll(notTimeout,
+ TimeUnit.MILLISECONDS);
- LOG.info("Adding vote");
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
-
- //If have received from all nodes, then terminate
- if (self.quorumPeers.size() == recvset.size()) {
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
-
- } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
- //Otherwise, wait for a fixed amount of time
- LOG.debug("Passed predicate");
-
- // Verify if there is any change in the proposed leader
- while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
- if(totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)){
- recvqueue.put(n);
- break;
- }
+ /*
+ * Sends more notifications if haven't received enough.
+ * Otherwise processes new notification.
+ */
+ if(n == null){
+ if(manager.haveDelivered()){
+ sendNotifications();
+ } else {
+ manager.connectAll();
+ }
+
+ /*
+ * Exponential backoff
+ */
+ int tmpTimeOut = notTimeout*2;
+ notTimeout = (tmpTimeOut < maxNotificationInterval?
+ tmpTimeOut : maxNotificationInterval);
+ LOG.info("Notification time out: " + notTimeout);
+ }
+ else {
+ //notTimeout = finalizeWait;
+ switch (n.state) {
+ case LOOKING:
+ // If notification > current, replace and send messages out
+ LOG.info("Notification: " + n.leader + ", " + n.zxid
+ + ", " + n.epoch + ", " + self.getId() + ", "
+ + self.getPeerState() + ", " + n.state + ", "
+ + n.sid);
+ if (n.epoch > logicalclock) {
+ logicalclock = n.epoch;
+ recvset.clear();
+ if(totalOrderPredicate(n.leader, n.zxid,
+ self.getId(), self.getLastLoggedZxid()))
+ updateProposal(n.leader, n.zxid);
+ else
+ updateProposal(self.getId(),
+ self.getLastLoggedZxid());
+ sendNotifications();
+ } else if (n.epoch < logicalclock) {
+ LOG.info("n.epoch < logicalclock");
+ break;
+ } else if (totalOrderPredicate(n.leader, n.zxid,
+ proposedLeader, proposedZxid)) {
+ LOG.info("Updating proposal");
+ updateProposal(n.leader, n.zxid);
+ sendNotifications();
}
- if (n == null) {
+ LOG.info("Adding vote");
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+
+ //If have received from all nodes, then terminate
+ if (self.quorumPeers.size() == recvset.size()) {
self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- LOG.info("About to leave instance:" + proposedLeader + ", " +
- proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
+ ServerState.LEADING: ServerState.FOLLOWING);
leaveInstance();
- return new Vote(proposedLeader,
- proposedZxid);
+ return new Vote(proposedLeader, proposedZxid);
+
+ } else if (termPredicate(recvset,
+ new Vote(proposedLeader, proposedZxid,
+ logicalclock))) {
+ //Otherwise, wait for a fixed amount of time
+ LOG.debug("Passed predicate");
+
+ // Verify if there is any change in the proposed leader
+ while((n = recvqueue.poll(finalizeWait,
+ TimeUnit.MILLISECONDS)) != null){
+ if(totalOrderPredicate(n.leader, n.zxid,
+ proposedLeader, proposedZxid)){
+ recvqueue.put(n);
+ break;
+ }
+ }
+
+ if (n == null) {
+ self.setPeerState((proposedLeader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+ LOG.info("About to leave instance:"
+ + proposedLeader + ", " +
+ proposedZxid + ", " + self.getId()
+ + ", " + self.getPeerState());
+ leaveInstance();
+ return new Vote(proposedLeader,
+ proposedZxid);
+ }
}
- }
- break;
- case LEADING:
- /*
- * There is at most one leader for each epoch, so if a peer claims to
- * be the leader for an epoch, then that peer must be the leader (no
- * arbitrary failures assumed). Now, if there is no quorum supporting
- * this leader, then processes will naturally move to a new epoch.
- */
- if(n.epoch == logicalclock){
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- case FOLLOWING:
- LOG.info("Notification: " + n.leader + ", " + n.zxid +
- ", " + n.epoch + ", " + self.getId() + ", " +
- self.getPeerState() + ", " + n.state + ", " + n.sid);
-
- outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
-
- if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
- && checkLeader(outofelection, n.leader, n.epoch)) {
- synchronized(this){
- logicalclock = n.epoch;
+ break;
+ case LEADING:
+ /*
+ * There is at most one leader for each epoch, so if a
+ * peer claims to be the leader for an epoch, then that
+ * peer must be the leader (no* arbitrary failures
+ * assumed). Now, if there is no quorum supporting
+ * this leader, then processes will naturally move
+ * to a new epoch.
+ */
+ if(n.epoch == logicalclock){
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: ServerState.FOLLOWING);
+
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
+ }
+ case FOLLOWING:
+ LOG.info("Notification: " + n.leader + ", " + n.zxid +
+ ", " + n.epoch + ", " + self.getId() + ", " +
+ self.getPeerState() + ", " + n.state + ", "
+ + n.sid);
+
+ outofelection.put(n.sid, new Vote(n.leader, n.zxid,
+ n.epoch, n.state));
+
+ if (termPredicate(outofelection, new Vote(n.leader,
+ n.zxid, n.epoch, n.state))
+ && checkLeader(outofelection, n.leader, n.epoch)) {
+ synchronized(this){
+ logicalclock = n.epoch;
+ self.setPeerState((n.leader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+ }
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
}
- leaveInstance();
- return new Vote(n.leader, n.zxid);
+ break;
+ default:
+ break;
}
- break;
- default:
- break;
}
}
+
+ return null;
+ } finally {
+ try {
+ if(self.jmxLeaderElectionBean != null){
+ MBeanRegistry.getInstance().unregister(
+ self.jmxLeaderElectionBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ self.jmxLeaderElectionBean = null;
}
-
- return null;
}
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java Thu Apr 16 22:59:49 2009
@@ -33,6 +33,7 @@
import org.apache.log4j.Logger;
+import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.quorum.Vote;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
@@ -40,7 +41,7 @@
public class LeaderElection implements Election {
private static final Logger LOG = Logger.getLogger(LeaderElection.class);
private static Random epochGen = new Random();
-
+
QuorumPeer self;
public LeaderElection(QuorumPeer self) {
@@ -80,7 +81,7 @@
}
}
}
-
+
HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
// Now do the tally
for (Vote v : votesCast) {
@@ -110,85 +111,120 @@
}
public Vote lookForLeader() throws InterruptedException {
- self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid()));
- // We are going to look for a leader by casting a vote for ourself
- byte requestBytes[] = new byte[4];
- ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
- byte responseBytes[] = new byte[28];
- ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
- /* The current vote for the leader. Initially me! */
- DatagramSocket s = null;
try {
- s = new DatagramSocket();
- s.setSoTimeout(200);
- } catch (SocketException e1) {
- e1.printStackTrace();
- System.exit(4);
+ self.jmxLeaderElectionBean = new LeaderElectionBean();
+ MBeanRegistry.getInstance().register(
+ self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ self.jmxLeaderElectionBean = null;
}
- DatagramPacket requestPacket = new DatagramPacket(requestBytes,
- requestBytes.length);
- DatagramPacket responsePacket = new DatagramPacket(responseBytes,
- responseBytes.length);
- HashMap<InetSocketAddress, Vote> votes = new HashMap<InetSocketAddress, Vote>(
- self.quorumPeers.size());
- int xid = epochGen.nextInt();
- while (self.running) {
- votes.clear();
- requestBuffer.clear();
- requestBuffer.putInt(xid);
- requestPacket.setLength(4);
- HashSet<Long> heardFrom = new HashSet<Long>();
- for (QuorumServer server : self.quorumPeers.values()) {
- requestPacket.setSocketAddress(server.addr);
- LOG.info("Server address: " + server.addr);
- try {
- s.send(requestPacket);
- responsePacket.setLength(responseBytes.length);
- s.receive(responsePacket);
- if (responsePacket.getLength() != responseBytes.length) {
- LOG.error("Got a short response: "
- + responsePacket.getLength());
- continue;
+
+ try {
+ self.setCurrentVote(new Vote(self.getId(),
+ self.getLastLoggedZxid()));
+ // We are going to look for a leader by casting a vote for ourself
+ byte requestBytes[] = new byte[4];
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+ byte responseBytes[] = new byte[28];
+ ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
+ /* The current vote for the leader. Initially me! */
+ DatagramSocket s = null;
+ try {
+ s = new DatagramSocket();
+ s.setSoTimeout(200);
+ } catch (SocketException e1) {
+ e1.printStackTrace();
+ System.exit(4);
+ }
+ DatagramPacket requestPacket = new DatagramPacket(requestBytes,
+ requestBytes.length);
+ DatagramPacket responsePacket = new DatagramPacket(responseBytes,
+ responseBytes.length);
+ HashMap<InetSocketAddress, Vote> votes =
+ new HashMap<InetSocketAddress, Vote>(self.quorumPeers.size());
+ int xid = epochGen.nextInt();
+ while (self.running) {
+ votes.clear();
+ requestBuffer.clear();
+ requestBuffer.putInt(xid);
+ requestPacket.setLength(4);
+ HashSet<Long> heardFrom = new HashSet<Long>();
+ for (QuorumServer server : self.quorumPeers.values()) {
+ LOG.info("Server address: " + server.addr);
+ try {
+ requestPacket.setSocketAddress(server.addr);
+ } catch (IllegalArgumentException e) {
+ // Sun doesn't include the address that causes this
+ // exception to be thrown, so we wrap the exception
+ // in order to capture this critical detail.
+ throw new IllegalArgumentException(
+ "Unable to set socket address on packet, msg:"
+ + e.getMessage() + " with addr:" + server.addr,
+ e);
}
- responseBuffer.clear();
- int recvedXid = responseBuffer.getInt();
- if (recvedXid != xid) {
- LOG.error("Got bad xid: expected " + xid
- + " got " + recvedXid);
- continue;
+
+ try {
+ s.send(requestPacket);
+ responsePacket.setLength(responseBytes.length);
+ s.receive(responsePacket);
+ if (responsePacket.getLength() != responseBytes.length) {
+ LOG.error("Got a short response: "
+ + responsePacket.getLength());
+ continue;
+ }
+ responseBuffer.clear();
+ int recvedXid = responseBuffer.getInt();
+ if (recvedXid != xid) {
+ LOG.error("Got bad xid: expected " + xid
+ + " got " + recvedXid);
+ continue;
+ }
+ long peerId = responseBuffer.getLong();
+ heardFrom.add(peerId);
+ //if(server.id != peerId){
+ Vote vote = new Vote(responseBuffer.getLong(),
+ responseBuffer.getLong());
+ InetSocketAddress addr =
+ (InetSocketAddress) responsePacket
+ .getSocketAddress();
+ votes.put(addr, vote);
+ //}
+ } catch (IOException e) {
+ LOG.warn("Ignoring exception while looking for leader",
+ e);
+ // Errors are okay, since hosts may be
+ // down
}
- long peerId = responseBuffer.getLong();
- heardFrom.add(peerId);
- //if(server.id != peerId){
- Vote vote = new Vote(responseBuffer.getLong(),
- responseBuffer.getLong());
- InetSocketAddress addr = (InetSocketAddress) responsePacket
- .getSocketAddress();
- votes.put(addr, vote);
- //}
- } catch (IOException e) {
- LOG.warn("Ignoring exception while looking for leader", e);
- // Errors are okay, since hosts may be
- // down
}
- }
- ElectionResult result = countVotes(votes, heardFrom);
- if (result.winner.id >= 0) {
- self.setCurrentVote(result.vote);
- if (result.winningCount > (self.quorumPeers.size() / 2)) {
- self.setCurrentVote(result.winner);
- s.close();
- Vote current = self.getCurrentVote();
- self.setPeerState((current.id == self.getId())
- ? ServerState.LEADING: ServerState.FOLLOWING);
- if (self.getPeerState() == ServerState.FOLLOWING) {
- Thread.sleep(100);
+ ElectionResult result = countVotes(votes, heardFrom);
+ if (result.winner.id >= 0) {
+ self.setCurrentVote(result.vote);
+ if (result.winningCount > (self.quorumPeers.size() / 2)) {
+ self.setCurrentVote(result.winner);
+ s.close();
+ Vote current = self.getCurrentVote();
+ self.setPeerState((current.id == self.getId())
+ ? ServerState.LEADING: ServerState.FOLLOWING);
+ if (self.getPeerState() == ServerState.FOLLOWING) {
+ Thread.sleep(100);
+ }
+ return current;
}
- return current;
}
+ Thread.sleep(1000);
+ }
+ return null;
+ } finally {
+ try {
+ if(self.jmxLeaderElectionBean != null){
+ MBeanRegistry.getInstance().unregister(
+ self.jmxLeaderElectionBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
}
- Thread.sleep(1000);
+ self.jmxLeaderElectionBean = null;
}
- return null;
}
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Apr 16 22:59:49 2009
@@ -366,14 +366,7 @@
}
protected Election makeLEStrategy(){
- LOG.debug("Running leader election protocol...");
- try {
- jmxLeaderElectionBean = new LeaderElectionBean();
- MBeanRegistry.getInstance().register(jmxLeaderElectionBean, jmxLocalPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxLeaderElectionBean = null;
- }
+ LOG.debug("Initializing leader election protocol...");
if(electionAlg==null)
return new LeaderElection(this);
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java Thu Apr 16 22:59:49 2009
@@ -91,7 +91,7 @@
LOG.info("starting up the zookeeper server .. waiting");
assertTrue("waiting for server being up",
ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
- ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
for (int i =0; i < 2000; i++) {
zk.create("/crctest- " + i , ("/crctest- " + i).getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java Thu Apr 16 22:59:49 2009
@@ -58,7 +58,7 @@
f.startup(zks);
assertTrue("waiting for server being up ",
ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
- ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
for (int i=0; i< 2000; i++) {
zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Thu Apr 16 22:59:49 2009
@@ -20,13 +20,20 @@
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.LineNumberReader;
+import java.io.StringReader;
+import java.util.regex.Pattern;
import junit.framework.TestCase;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.apache.log4j.WriterAppender;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -35,6 +42,7 @@
import org.apache.zookeeper.test.ClientBase;
import org.junit.Test;
+
/**
* Test stand-alone server.
*
@@ -159,6 +167,67 @@
ClientBase.CONNECTION_TIMEOUT));
}
+ /**
+ * Verify handling of bad quorum address
+ */
+ @Test
+ public void testBadPeerAddressInQuorum() throws Exception {
+ LOG.info("STARTING " + getName());
+ ClientBase.setupTestEnv();
+
+ // setup the logger to capture all logs
+ Layout layout =
+ Logger.getRootLogger().getAppender("CONSOLE").getLayout();
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ WriterAppender appender = new WriterAppender(layout, os);
+ appender.setThreshold(Level.WARN);
+ Logger.getLogger(org.apache.zookeeper.server.quorum.QuorumPeer.class)
+ .addAppender(appender);
+
+ try {
+ final int CLIENT_PORT_QP1 = 3181;
+ final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
+
+ String quorumCfgSection =
+ "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
+ + ":" + (CLIENT_PORT_QP1 + 2)
+ + "\nserver.2=fee.fii.foo.fum:" + (CLIENT_PORT_QP2 + 1)
+ + ":" + (CLIENT_PORT_QP2 + 2);
+
+ MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
+ q1.start();
+
+ boolean isup =
+ ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP1,
+ 5000);
+
+ assertFalse(isup);
+
+ q1.shutdown();
+
+ assertTrue("waiting for server 1 down",
+ ClientBase.waitForServerDown("localhost:" + CLIENT_PORT_QP1,
+ ClientBase.CONNECTION_TIMEOUT));
+
+ } finally {
+ Logger.getLogger(org.apache.zookeeper.server.quorum.QuorumPeer.class)
+ .removeAppender(appender);
+ }
+
+ LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
+ String line;
+ boolean found = false;
+ Pattern p =
+ Pattern.compile(".*IllegalArgumentException.*fee.fii.foo.fum.*");
+ while ((line = r.readLine()) != null) {
+ found = p.matcher(line).matches();
+ if (found) {
+ break;
+ }
+ }
+ assertTrue("complains about host", found);
+ }
+
public void process(WatchedEvent event) {
// ignore for this test
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java Thu Apr 16 22:59:49 2009
@@ -71,7 +71,7 @@
LOG.info("starting up the zookeeper server .. waiting");
assertTrue("waiting for server being up",
ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
- ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
String path;
LOG.info("starting creating acls");
for (int i = 0; i < 100; i++) {
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java Thu Apr 16 22:59:49 2009
@@ -91,7 +91,7 @@
throws IOException, InterruptedException
{
CountdownWatcher watcher = new CountdownWatcher();
- ZooKeeper zk = new ZooKeeper(hp, 30000, watcher);
+ ZooKeeper zk = new ZooKeeper(hp, CONNECTION_TIMEOUT, watcher);
if(!watcher.clientConnected.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS))
{
@@ -117,7 +117,7 @@
public void run() {
try {
- zk = new ZooKeeper(qb.hostPort, 30000, this);
+ zk = new ZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, this);
while(bang) {
incOutstanding(); // before create otw race
zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE,
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Thu Apr 16 22:59:49 2009
@@ -129,7 +129,8 @@
protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
throws IOException, InterruptedException
{
- TestableZooKeeper zk = new TestableZooKeeper(hp, 9000, watcher);
+ TestableZooKeeper zk =
+ new TestableZooKeeper(hp, CONNECTION_TIMEOUT, watcher);
if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS))
{
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/IntegrityCheck.java Thu Apr 16 22:59:49 2009
@@ -82,7 +82,7 @@
IntegrityCheck(String hostPort, String path, int count) throws
IOException {
- zk = new ZooKeeper(hostPort, 15000, this);
+ zk = new ZooKeeper(hostPort, 30000, this);
this.path = path;
this.count = count;
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java Thu Apr 16 22:59:49 2009
@@ -104,7 +104,8 @@
}
private void utestExists() throws IOException, InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this);
+ ZooKeeper zk =
+ new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this);
for (int i = 0; i < 10000; i++) {
zk.exists("/this/path/doesnt_exist!", true);
}
@@ -113,7 +114,8 @@
private void utestPrep() throws IOException,
InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this);
+ ZooKeeper zk =
+ new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this);
for (int i = 0; i < 10000; i++) {
zk.create("/" + i, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@@ -121,7 +123,8 @@
}
private void utestGet() throws IOException, InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this);
+ ZooKeeper zk =
+ new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this);
for (int i = 0; i < 10000; i++) {
Stat stat = new Stat();
zk.getData("/" + i, true, stat);
@@ -130,7 +133,8 @@
}
private void utestChildren() throws IOException, InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this);
+ ZooKeeper zk =
+ new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this);
for (int i = 0; i < 10000; i++) {
zk.getChildren("/" + i, true);
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java Thu Apr 16 22:59:49 2009
@@ -58,7 +58,7 @@
f.startup(zks);
assertTrue("waiting for server being up ",
ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
- ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
for (int i=0; i< 2000; i++) {
zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java Thu Apr 16 22:59:49 2009
@@ -87,7 +87,7 @@
CONNECTION_TIMEOUT));
startSignal = new CountDownLatch(1);
- ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
startSignal.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS);
assertTrue("count == 0", startSignal.getCount() == 0);
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java Thu Apr 16 22:59:49 2009
@@ -73,7 +73,7 @@
LOG.info("starting up the zookeeper server .. waiting");
assertTrue("waiting for server being up",
ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
- ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
Stat stat = zk.exists("/", false);
List<String> children = zk.getChildren("/", false);
Collections.sort(children);
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java?rev=765797&r1=765796&r2=765797&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java Thu Apr 16 22:59:49 2009
@@ -108,7 +108,7 @@
protected ZooKeeper createClient(Watcher watcher, CountDownLatch latch)
throws IOException, InterruptedException
{
- ZooKeeper zk = new ZooKeeper(hostPort, 20000, watcher);
+ ZooKeeper zk = new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher);
if(!latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
fail("Unable to connect to server");
}
|